diff options
author | wjaniszewski <wjaniszewski@13f79535-47bb-0310-9956-ffa450edef68> | 2009-04-13 09:41:00 +0000 |
---|---|---|
committer | wjaniszewski <wjaniszewski@13f79535-47bb-0310-9956-ffa450edef68> | 2009-04-13 09:41:00 +0000 |
commit | adcd82c520cafe5744c4256c245c3930955d3491 (patch) | |
tree | ab039112b0fbe1c1ce5186afa1e6c7a41d859452 /branches/sca-java-1.x/modules | |
parent | 0aafa12010ca496809a7fa7e5327d1f90556d399 (diff) |
General improvements for messaging, fixed some issues regarding communication with real Erlang nodes
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@764382 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'branches/sca-java-1.x/modules')
4 files changed, 97 insertions, 76 deletions
diff --git a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java index 7cd23aa1b1..6750e292d9 100644 --- a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java +++ b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java @@ -81,9 +81,12 @@ public class ErlangInvoker implements Invoker { }
tmpMbox = node.createMbox();
Object[] args = msg.getBody();
- OtpErlangObject msgPayload = TypeHelpersProxy.toErlang(args);
+ // create and send msg with self pid in the beginning
+ OtpErlangObject[] argsArray = { tmpMbox.self(),
+ TypeHelpersProxy.toErlang(args) };
+ OtpErlangObject otpArgs = new OtpErlangTuple(argsArray);
tmpMbox.send(msg.getOperation().getName(), binding.getNode(),
- msgPayload);
+ otpArgs);
if (msg.getOperation().getOutputType() != null) {
OtpMsg resultMsg = null;
if (binding.hasTimeout()) {
@@ -149,12 +152,8 @@ public class ErlangInvoker implements Invoker { reportProblem(msg, e);
msg.setBody(null);
} else if (msg.getOperation().getOutputType() != null) {
- if (result.getClass().equals(OtpErlangTuple.class)) {
- OtpErlangObject resultBody = ((OtpErlangTuple) result)
- .elementAt(1);
- msg.setBody(TypeHelpersProxy.toJava(resultBody, msg
- .getOperation().getOutputType().getPhysical()));
- }
+ msg.setBody(TypeHelpersProxy.toJava(result, msg.getOperation()
+ .getOutputType().getPhysical()));
}
} catch (OtpAuthException e) {
// TODO: externalize message?
diff --git a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java index 001256f5f0..696447feef 100644 --- a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java +++ b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java @@ -43,9 +43,7 @@ import com.ericsson.otp.erlang.OtpErlangPid; import com.ericsson.otp.erlang.OtpErlangRef; import com.ericsson.otp.erlang.OtpErlangString; import com.ericsson.otp.erlang.OtpErlangTuple; -import com.ericsson.otp.erlang.OtpMbox; import com.ericsson.otp.erlang.OtpMsg; -import com.ericsson.otp.erlang.OtpNode; /** * @version $Rev$ $Date$ @@ -72,17 +70,18 @@ public class ServiceExecutor implements Runnable { private void sendMessage(OtpConnection connection, OtpErlangPid pid, OtpErlangRef ref, OtpErlangAtom head, OtpErlangObject message) throws IOException { - OtpErlangTuple tResult = new OtpErlangTuple(new OtpErlangObject[] { - head, message }); + OtpErlangObject tResult = null; + if (head != null) { + tResult = new OtpErlangTuple( + new OtpErlangObject[] { head, message }); + } else { + tResult = message; + } OtpErlangObject msg = null; msg = new OtpErlangTuple(new OtpErlangObject[] { ref, tResult }); connection.send(pid, msg); } - private String getResponseClientNodeName(OtpErlangPid pid) { - return "_response_connector_to_" + pid + System.currentTimeMillis(); - } - private void handleRpc(OtpMsg msg) { OtpErlangTuple request = null; OtpErlangPid senderPid = null; @@ -148,8 +147,8 @@ public class ServiceExecutor implements Runnable { Object[] arrArg = new Object[] { result }; response = TypeHelpersProxy.toErlang(arrArg); } - sendMessage(connection, senderPid, senderRef, - MessageHelper.ATOM_OK, response); + sendMessage(connection, senderPid, senderRef, null, + response); } catch (Exception e) { if ((e.getClass().equals( InvocationTargetException.class) && e @@ -207,8 +206,24 @@ public class ServiceExecutor implements Runnable { private void handleMsg(OtpMsg msg) { Operation matchedOperation = null; Object args[] = null; + OtpErlangPid senderPid = null; + OtpErlangObject msgNoSender = null; List<Operation> operations = groupedOperations.get(msg .getRecipientName()); + try { + if (msg.getMsg().getClass().equals(OtpErlangTuple.class) + && (((OtpErlangTuple) msg.getMsg()).elementAt(0)) + .getClass().equals(OtpErlangPid.class)) { + senderPid = (OtpErlangPid) ((OtpErlangTuple) msg.getMsg()) + .elementAt(0); + msgNoSender = ((OtpErlangTuple) msg.getMsg()).elementAt(1); + } else { + msgNoSender = msg.getMsg(); + } + } catch (Exception e) { + + } + if (operations == null) { // TODO: externalize message? // NOTE: I assume in Erlang sender doesn't get confirmation so @@ -224,7 +239,7 @@ public class ServiceExecutor implements Runnable { forClasses[i] = iTypes.get(i).getPhysical(); } try { - args = TypeHelpersProxy.toJavaAsArgs(msg.getMsg(), + args = TypeHelpersProxy.toJavaAsArgs(msgNoSender, forClasses); matchedOperation = operation; break; @@ -247,11 +262,12 @@ public class ServiceExecutor implements Runnable { Object[] arrArg = new Object[] { result }; response = TypeHelpersProxy.toErlang(arrArg); } - if (response != null) { - OtpNode node = new OtpNode( - getResponseClientNodeName(msg.getSenderPid())); - OtpMbox mbox = node.createMbox(); - mbox.send(msg.getSenderPid(), response); + if (response != null && senderPid != null) { + connection.send(senderPid, response); + } else if (response != null && senderPid == null) { + // FIXME: cannot send reply - sender didn't provided + // pid. Use PID obtained by jinteface or log this error? + // connection.send(msg.getSenderPid(), response); } } catch (InvocationTargetException e) { // FIXME: use linking feature? send some error? @@ -262,17 +278,11 @@ public class ServiceExecutor implements Runnable { e.printStackTrace(); } } else { - try { - // TODO: externalize message? - // NOTE: don't send error message if mapping not found - logger.log(Level.WARNING, - "No mapping for such arguments in '" - + msg.getRecipientName() - + "' operation in '" + name - + "' node. Recevied arguments: " - + msg.getMsg()); - } catch (OtpErlangDecodeException e) { - } + // TODO: externalize message? + // NOTE: don't send error message if mapping not found + logger.log(Level.WARNING, "No mapping for such arguments in '" + + msg.getRecipientName() + "' operation in '" + name + + "' node. Recevied arguments: " + msgNoSender); } } } diff --git a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java index 8450fd48a4..817ba2cefa 100644 --- a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java +++ b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java @@ -21,6 +21,8 @@ package org.apache.tuscany.sca.binding.erlang.testing; import org.apache.tuscany.sca.binding.erlang.impl.types.TypeHelpersProxy; +import com.ericsson.otp.erlang.OtpErlangObject; +import com.ericsson.otp.erlang.OtpErlangTuple; import com.ericsson.otp.erlang.OtpMbox; import com.ericsson.otp.erlang.OtpMsg; @@ -58,22 +60,27 @@ public class MboxListener implements Runnable { } } - public OtpMsg getMsg() { - // Sometimes clients tries to get message which isn't fully received. - // If so - give it more tries. This sometimes caused - // NullPointerException in - // ReferenceServiceTestCase.testMultipleArguments(). - for (int i = 0; i < 3; i++) { - if (msg != null) { - return msg; - } else { - try { - Thread.sleep(100); - } catch (InterruptedException e) { + public OtpErlangObject getMsg() { + try { + // Sometimes clients tries to get message which isn't fully + // received. + // If so - give it more tries. This sometimes caused + // NullPointerException in + // ReferenceServiceTestCase.testMultipleArguments(). + for (int i = 0; i < 3; i++) { + if (msg != null) { + return ((OtpErlangTuple) msg.getMsg()).elementAt(1); + } else { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } } } + return msg.getMsg(); + } catch (Exception e) { + } - return msg; + return null; } - } diff --git a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java index b602590183..1d349888e6 100644 --- a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java +++ b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java @@ -130,7 +130,7 @@ public class ReferenceServiceTestCase { Thread mboxThread = new Thread(mboxListener); mboxThread.start(); String testResult = mboxReference.sendArgs(strArg); - assertEquals(strArg, ((OtpErlangString) mboxListener.getMsg().getMsg()) + assertEquals(strArg, ((OtpErlangString) mboxListener.getMsg()) .stringValue()); assertEquals(strResult, testResult); } @@ -148,8 +148,8 @@ public class ReferenceServiceTestCase { Thread mboxThread = new Thread(mboxListener); mboxThread.start(); boolean testResult = mboxReference.sendArgs(booleanArg); - assertEquals(booleanArg, ((OtpErlangAtom) mboxListener.getMsg() - .getMsg()).booleanValue()); + assertEquals(booleanArg, ((OtpErlangAtom) mboxListener.getMsg()) + .booleanValue()); assertEquals(booleanResult, testResult); } @@ -166,8 +166,8 @@ public class ReferenceServiceTestCase { Thread mboxThread = new Thread(mboxListener); mboxThread.start(); float testResult = mboxReference.sendArgs(floatArg); - assertEquals(floatArg, ((OtpErlangDouble) mboxListener.getMsg() - .getMsg()).doubleValue(), 0); + assertEquals(floatArg, ((OtpErlangDouble) mboxListener.getMsg()) + .doubleValue(), 0); assertEquals(floatResult, testResult, 0); } @@ -184,8 +184,8 @@ public class ReferenceServiceTestCase { Thread mboxThread = new Thread(mboxListener); mboxThread.start(); double testResult = mboxReference.sendArgs(doubleArg); - assertEquals(doubleArg, ((OtpErlangDouble) mboxListener.getMsg() - .getMsg()).doubleValue(), 0); + assertEquals(doubleArg, ((OtpErlangDouble) mboxListener.getMsg()) + .doubleValue(), 0); assertEquals(doubleResult, testResult, 0); } @@ -202,7 +202,7 @@ public class ReferenceServiceTestCase { Thread mboxThread = new Thread(mboxListener); mboxThread.start(); long testResult = mboxReference.sendArgs(longArg); - assertEquals(longArg, ((OtpErlangLong) mboxListener.getMsg().getMsg()) + assertEquals(longArg, ((OtpErlangLong) mboxListener.getMsg()) .longValue(), 0); assertEquals(longResult, testResult, 0); } @@ -220,8 +220,8 @@ public class ReferenceServiceTestCase { Thread mboxThread = new Thread(mboxListener); mboxThread.start(); int testResult = mboxReference.sendArgs(intArg); - assertEquals(intArg, ((OtpErlangLong) mboxListener.getMsg().getMsg()) - .intValue(), 0); + assertEquals(intArg, + ((OtpErlangLong) mboxListener.getMsg()).intValue(), 0); assertEquals(intResult, testResult, 0); } @@ -238,7 +238,7 @@ public class ReferenceServiceTestCase { Thread mboxThread = new Thread(mboxListener); mboxThread.start(); char testResult = mboxReference.sendArgs(charArg); - assertEquals(charArg, ((OtpErlangLong) mboxListener.getMsg().getMsg()) + assertEquals(charArg, ((OtpErlangLong) mboxListener.getMsg()) .charValue(), 0); assertEquals(charResult, testResult, 0); } @@ -256,7 +256,7 @@ public class ReferenceServiceTestCase { Thread mboxThread = new Thread(mboxListener); mboxThread.start(); short testResult = mboxReference.sendArgs(shortArg); - assertEquals(shortArg, ((OtpErlangLong) mboxListener.getMsg().getMsg()) + assertEquals(shortArg, ((OtpErlangLong) mboxListener.getMsg()) .shortValue(), 0); assertEquals(shortResult, testResult, 0); } @@ -274,7 +274,7 @@ public class ReferenceServiceTestCase { Thread mboxThread = new Thread(mboxListener); mboxThread.start(); byte testResult = mboxReference.sendArgs(byteArg); - assertEquals(byteArg, ((OtpErlangLong) mboxListener.getMsg().getMsg()) + assertEquals(byteArg, ((OtpErlangLong) mboxListener.getMsg()) .byteValue(), 0); assertEquals(byteResult, testResult, 0); } @@ -293,10 +293,10 @@ public class ReferenceServiceTestCase { int testInt = 10; mboxReference.sendArgs(testInt, testString); assertEquals(testInt, ((OtpErlangLong) ((OtpErlangTuple) mboxListener - .getMsg().getMsg()).elementAt(0)).longValue()); + .getMsg()).elementAt(0)).longValue()); assertEquals(testString, - ((OtpErlangString) ((OtpErlangTuple) mboxListener.getMsg() - .getMsg()).elementAt(1)).stringValue()); + ((OtpErlangString) ((OtpErlangTuple) mboxListener.getMsg()) + .elementAt(1)).stringValue()); } /** @@ -321,8 +321,7 @@ public class ReferenceServiceTestCase { testArg.arg1.arg2 = "Arg2b"; StructuredTuple testResult = mboxReference.sendArgs(testArg); assertEquals(tupleResult, testResult); - OtpErlangTuple received = (OtpErlangTuple) mboxListener.getMsg() - .getMsg(); + OtpErlangTuple received = (OtpErlangTuple) mboxListener.getMsg(); assertEquals(testArg.arg1.arg1, ((OtpErlangLong) ((OtpErlangTuple) received.elementAt(0)) .elementAt(0)).longValue()); @@ -351,8 +350,7 @@ public class ReferenceServiceTestCase { for (int i = 0; i < testArg.length; i++) { assertEquals(testArg[i], testResult[i]); } - OtpErlangBinary received = (OtpErlangBinary) mboxListener.getMsg() - .getMsg(); + OtpErlangBinary received = (OtpErlangBinary) mboxListener.getMsg(); assertEquals(testArg.length, received.size()); for (int i = 0; i < testArg.length; i++) { assertEquals(testArg[i], received.binaryValue()[i]); @@ -375,7 +373,7 @@ public class ReferenceServiceTestCase { for (int i = 0; i < testArg.length; i++) { assertEquals(testArg[i], testResult[i]); } - OtpErlangList received = (OtpErlangList) mboxListener.getMsg().getMsg(); + OtpErlangList received = (OtpErlangList) mboxListener.getMsg(); assertEquals(testArg.length, received.arity()); for (int i = 0; i < testArg.length; i++) { assertEquals(testArg[i], ((OtpErlangString) received.elementAt(i)) @@ -402,7 +400,7 @@ public class ReferenceServiceTestCase { assertEquals(testArg[i][j], testResult[i][j]); } } - OtpErlangList received = (OtpErlangList) mboxListener.getMsg().getMsg(); + OtpErlangList received = (OtpErlangList) mboxListener.getMsg(); assertEquals(testArg.length, received.arity()); for (int i = 0; i < testArg.length; i++) { for (int j = 0; j < testArg[i].length; j++) { @@ -631,19 +629,22 @@ public class ReferenceServiceTestCase { assertEquals(ErlangException.class, e.getClass()); } } - + /** * Tests mbox with retrieving and answering with basic arguments * * @throws Exception */ - @Test(timeout = 1000) + @Test(timeout = 2000) public void testMbox() throws Exception { OtpErlangObject[] args = new OtpErlangObject[2]; args[0] = new OtpErlangString("world"); args[1] = new OtpErlangString("!"); OtpErlangTuple tuple = new OtpErlangTuple(args); - refMbox.send("sayHello", "RPCServerMbox", tuple); + OtpErlangObject[] argsWithSender = new OtpErlangObject[2]; + argsWithSender[0] = refMbox.self(); + argsWithSender[1] = tuple; + refMbox.send("sayHello", "RPCServerMbox", new OtpErlangTuple(argsWithSender)); OtpErlangString result = (OtpErlangString) refMbox.receiveMsg() .getMsg(); assertEquals("Hello world !", result.stringValue()); @@ -679,7 +680,10 @@ public class ReferenceServiceTestCase { argsContent[0] = structuredTuple; argsContent[1] = list; OtpErlangTuple args = new OtpErlangTuple(argsContent); - refMbox.send("passComplexArgs", "RPCServerMbox", args); + OtpErlangObject[] withSender = new OtpErlangObject[2]; + withSender[0] = refMbox.self(); + withSender[1] = args; + refMbox.send("passComplexArgs", "RPCServerMbox", new OtpErlangTuple(withSender)); OtpErlangObject result = refMbox.receiveMsg().getMsg(); assertEquals(arg1, ((OtpErlangLong) ((OtpErlangTuple) ((OtpErlangTuple) result) @@ -766,6 +770,7 @@ public class ReferenceServiceTestCase { /** * Tests timeout feature for service side bindings + * * @throws Exception */ @Test(timeout = 4000) @@ -811,5 +816,5 @@ public class ReferenceServiceTestCase { cookieModuleReference.sayHellos(); } - + } |