summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.x
diff options
context:
space:
mode:
authorwjaniszewski <wjaniszewski@13f79535-47bb-0310-9956-ffa450edef68>2009-04-13 09:41:00 +0000
committerwjaniszewski <wjaniszewski@13f79535-47bb-0310-9956-ffa450edef68>2009-04-13 09:41:00 +0000
commitadcd82c520cafe5744c4256c245c3930955d3491 (patch)
treeab039112b0fbe1c1ce5186afa1e6c7a41d859452 /branches/sca-java-1.x
parent0aafa12010ca496809a7fa7e5327d1f90556d399 (diff)
General improvements for messaging, fixed some issues regarding communication with real Erlang nodes
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@764382 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'branches/sca-java-1.x')
-rw-r--r--branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java15
-rw-r--r--branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java64
-rw-r--r--branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java35
-rw-r--r--branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java59
4 files changed, 97 insertions, 76 deletions
diff --git a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java
index 7cd23aa1b1..6750e292d9 100644
--- a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java
+++ b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java
@@ -81,9 +81,12 @@ public class ErlangInvoker implements Invoker {
}
tmpMbox = node.createMbox();
Object[] args = msg.getBody();
- OtpErlangObject msgPayload = TypeHelpersProxy.toErlang(args);
+ // create and send msg with self pid in the beginning
+ OtpErlangObject[] argsArray = { tmpMbox.self(),
+ TypeHelpersProxy.toErlang(args) };
+ OtpErlangObject otpArgs = new OtpErlangTuple(argsArray);
tmpMbox.send(msg.getOperation().getName(), binding.getNode(),
- msgPayload);
+ otpArgs);
if (msg.getOperation().getOutputType() != null) {
OtpMsg resultMsg = null;
if (binding.hasTimeout()) {
@@ -149,12 +152,8 @@ public class ErlangInvoker implements Invoker {
reportProblem(msg, e);
msg.setBody(null);
} else if (msg.getOperation().getOutputType() != null) {
- if (result.getClass().equals(OtpErlangTuple.class)) {
- OtpErlangObject resultBody = ((OtpErlangTuple) result)
- .elementAt(1);
- msg.setBody(TypeHelpersProxy.toJava(resultBody, msg
- .getOperation().getOutputType().getPhysical()));
- }
+ msg.setBody(TypeHelpersProxy.toJava(result, msg.getOperation()
+ .getOutputType().getPhysical()));
}
} catch (OtpAuthException e) {
// TODO: externalize message?
diff --git a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java
index 001256f5f0..696447feef 100644
--- a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java
+++ b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java
@@ -43,9 +43,7 @@ import com.ericsson.otp.erlang.OtpErlangPid;
import com.ericsson.otp.erlang.OtpErlangRef;
import com.ericsson.otp.erlang.OtpErlangString;
import com.ericsson.otp.erlang.OtpErlangTuple;
-import com.ericsson.otp.erlang.OtpMbox;
import com.ericsson.otp.erlang.OtpMsg;
-import com.ericsson.otp.erlang.OtpNode;
/**
* @version $Rev$ $Date$
@@ -72,17 +70,18 @@ public class ServiceExecutor implements Runnable {
private void sendMessage(OtpConnection connection, OtpErlangPid pid,
OtpErlangRef ref, OtpErlangAtom head, OtpErlangObject message)
throws IOException {
- OtpErlangTuple tResult = new OtpErlangTuple(new OtpErlangObject[] {
- head, message });
+ OtpErlangObject tResult = null;
+ if (head != null) {
+ tResult = new OtpErlangTuple(
+ new OtpErlangObject[] { head, message });
+ } else {
+ tResult = message;
+ }
OtpErlangObject msg = null;
msg = new OtpErlangTuple(new OtpErlangObject[] { ref, tResult });
connection.send(pid, msg);
}
- private String getResponseClientNodeName(OtpErlangPid pid) {
- return "_response_connector_to_" + pid + System.currentTimeMillis();
- }
-
private void handleRpc(OtpMsg msg) {
OtpErlangTuple request = null;
OtpErlangPid senderPid = null;
@@ -148,8 +147,8 @@ public class ServiceExecutor implements Runnable {
Object[] arrArg = new Object[] { result };
response = TypeHelpersProxy.toErlang(arrArg);
}
- sendMessage(connection, senderPid, senderRef,
- MessageHelper.ATOM_OK, response);
+ sendMessage(connection, senderPid, senderRef, null,
+ response);
} catch (Exception e) {
if ((e.getClass().equals(
InvocationTargetException.class) && e
@@ -207,8 +206,24 @@ public class ServiceExecutor implements Runnable {
private void handleMsg(OtpMsg msg) {
Operation matchedOperation = null;
Object args[] = null;
+ OtpErlangPid senderPid = null;
+ OtpErlangObject msgNoSender = null;
List<Operation> operations = groupedOperations.get(msg
.getRecipientName());
+ try {
+ if (msg.getMsg().getClass().equals(OtpErlangTuple.class)
+ && (((OtpErlangTuple) msg.getMsg()).elementAt(0))
+ .getClass().equals(OtpErlangPid.class)) {
+ senderPid = (OtpErlangPid) ((OtpErlangTuple) msg.getMsg())
+ .elementAt(0);
+ msgNoSender = ((OtpErlangTuple) msg.getMsg()).elementAt(1);
+ } else {
+ msgNoSender = msg.getMsg();
+ }
+ } catch (Exception e) {
+
+ }
+
if (operations == null) {
// TODO: externalize message?
// NOTE: I assume in Erlang sender doesn't get confirmation so
@@ -224,7 +239,7 @@ public class ServiceExecutor implements Runnable {
forClasses[i] = iTypes.get(i).getPhysical();
}
try {
- args = TypeHelpersProxy.toJavaAsArgs(msg.getMsg(),
+ args = TypeHelpersProxy.toJavaAsArgs(msgNoSender,
forClasses);
matchedOperation = operation;
break;
@@ -247,11 +262,12 @@ public class ServiceExecutor implements Runnable {
Object[] arrArg = new Object[] { result };
response = TypeHelpersProxy.toErlang(arrArg);
}
- if (response != null) {
- OtpNode node = new OtpNode(
- getResponseClientNodeName(msg.getSenderPid()));
- OtpMbox mbox = node.createMbox();
- mbox.send(msg.getSenderPid(), response);
+ if (response != null && senderPid != null) {
+ connection.send(senderPid, response);
+ } else if (response != null && senderPid == null) {
+ // FIXME: cannot send reply - sender didn't provided
+ // pid. Use PID obtained by jinteface or log this error?
+ // connection.send(msg.getSenderPid(), response);
}
} catch (InvocationTargetException e) {
// FIXME: use linking feature? send some error?
@@ -262,17 +278,11 @@ public class ServiceExecutor implements Runnable {
e.printStackTrace();
}
} else {
- try {
- // TODO: externalize message?
- // NOTE: don't send error message if mapping not found
- logger.log(Level.WARNING,
- "No mapping for such arguments in '"
- + msg.getRecipientName()
- + "' operation in '" + name
- + "' node. Recevied arguments: "
- + msg.getMsg());
- } catch (OtpErlangDecodeException e) {
- }
+ // TODO: externalize message?
+ // NOTE: don't send error message if mapping not found
+ logger.log(Level.WARNING, "No mapping for such arguments in '"
+ + msg.getRecipientName() + "' operation in '" + name
+ + "' node. Recevied arguments: " + msgNoSender);
}
}
}
diff --git a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java
index 8450fd48a4..817ba2cefa 100644
--- a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java
+++ b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java
@@ -21,6 +21,8 @@ package org.apache.tuscany.sca.binding.erlang.testing;
import org.apache.tuscany.sca.binding.erlang.impl.types.TypeHelpersProxy;
+import com.ericsson.otp.erlang.OtpErlangObject;
+import com.ericsson.otp.erlang.OtpErlangTuple;
import com.ericsson.otp.erlang.OtpMbox;
import com.ericsson.otp.erlang.OtpMsg;
@@ -58,22 +60,27 @@ public class MboxListener implements Runnable {
}
}
- public OtpMsg getMsg() {
- // Sometimes clients tries to get message which isn't fully received.
- // If so - give it more tries. This sometimes caused
- // NullPointerException in
- // ReferenceServiceTestCase.testMultipleArguments().
- for (int i = 0; i < 3; i++) {
- if (msg != null) {
- return msg;
- } else {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
+ public OtpErlangObject getMsg() {
+ try {
+ // Sometimes clients tries to get message which isn't fully
+ // received.
+ // If so - give it more tries. This sometimes caused
+ // NullPointerException in
+ // ReferenceServiceTestCase.testMultipleArguments().
+ for (int i = 0; i < 3; i++) {
+ if (msg != null) {
+ return ((OtpErlangTuple) msg.getMsg()).elementAt(1);
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
}
}
+ return msg.getMsg();
+ } catch (Exception e) {
+
}
- return msg;
+ return null;
}
-
}
diff --git a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java
index b602590183..1d349888e6 100644
--- a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java
+++ b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java
@@ -130,7 +130,7 @@ public class ReferenceServiceTestCase {
Thread mboxThread = new Thread(mboxListener);
mboxThread.start();
String testResult = mboxReference.sendArgs(strArg);
- assertEquals(strArg, ((OtpErlangString) mboxListener.getMsg().getMsg())
+ assertEquals(strArg, ((OtpErlangString) mboxListener.getMsg())
.stringValue());
assertEquals(strResult, testResult);
}
@@ -148,8 +148,8 @@ public class ReferenceServiceTestCase {
Thread mboxThread = new Thread(mboxListener);
mboxThread.start();
boolean testResult = mboxReference.sendArgs(booleanArg);
- assertEquals(booleanArg, ((OtpErlangAtom) mboxListener.getMsg()
- .getMsg()).booleanValue());
+ assertEquals(booleanArg, ((OtpErlangAtom) mboxListener.getMsg())
+ .booleanValue());
assertEquals(booleanResult, testResult);
}
@@ -166,8 +166,8 @@ public class ReferenceServiceTestCase {
Thread mboxThread = new Thread(mboxListener);
mboxThread.start();
float testResult = mboxReference.sendArgs(floatArg);
- assertEquals(floatArg, ((OtpErlangDouble) mboxListener.getMsg()
- .getMsg()).doubleValue(), 0);
+ assertEquals(floatArg, ((OtpErlangDouble) mboxListener.getMsg())
+ .doubleValue(), 0);
assertEquals(floatResult, testResult, 0);
}
@@ -184,8 +184,8 @@ public class ReferenceServiceTestCase {
Thread mboxThread = new Thread(mboxListener);
mboxThread.start();
double testResult = mboxReference.sendArgs(doubleArg);
- assertEquals(doubleArg, ((OtpErlangDouble) mboxListener.getMsg()
- .getMsg()).doubleValue(), 0);
+ assertEquals(doubleArg, ((OtpErlangDouble) mboxListener.getMsg())
+ .doubleValue(), 0);
assertEquals(doubleResult, testResult, 0);
}
@@ -202,7 +202,7 @@ public class ReferenceServiceTestCase {
Thread mboxThread = new Thread(mboxListener);
mboxThread.start();
long testResult = mboxReference.sendArgs(longArg);
- assertEquals(longArg, ((OtpErlangLong) mboxListener.getMsg().getMsg())
+ assertEquals(longArg, ((OtpErlangLong) mboxListener.getMsg())
.longValue(), 0);
assertEquals(longResult, testResult, 0);
}
@@ -220,8 +220,8 @@ public class ReferenceServiceTestCase {
Thread mboxThread = new Thread(mboxListener);
mboxThread.start();
int testResult = mboxReference.sendArgs(intArg);
- assertEquals(intArg, ((OtpErlangLong) mboxListener.getMsg().getMsg())
- .intValue(), 0);
+ assertEquals(intArg,
+ ((OtpErlangLong) mboxListener.getMsg()).intValue(), 0);
assertEquals(intResult, testResult, 0);
}
@@ -238,7 +238,7 @@ public class ReferenceServiceTestCase {
Thread mboxThread = new Thread(mboxListener);
mboxThread.start();
char testResult = mboxReference.sendArgs(charArg);
- assertEquals(charArg, ((OtpErlangLong) mboxListener.getMsg().getMsg())
+ assertEquals(charArg, ((OtpErlangLong) mboxListener.getMsg())
.charValue(), 0);
assertEquals(charResult, testResult, 0);
}
@@ -256,7 +256,7 @@ public class ReferenceServiceTestCase {
Thread mboxThread = new Thread(mboxListener);
mboxThread.start();
short testResult = mboxReference.sendArgs(shortArg);
- assertEquals(shortArg, ((OtpErlangLong) mboxListener.getMsg().getMsg())
+ assertEquals(shortArg, ((OtpErlangLong) mboxListener.getMsg())
.shortValue(), 0);
assertEquals(shortResult, testResult, 0);
}
@@ -274,7 +274,7 @@ public class ReferenceServiceTestCase {
Thread mboxThread = new Thread(mboxListener);
mboxThread.start();
byte testResult = mboxReference.sendArgs(byteArg);
- assertEquals(byteArg, ((OtpErlangLong) mboxListener.getMsg().getMsg())
+ assertEquals(byteArg, ((OtpErlangLong) mboxListener.getMsg())
.byteValue(), 0);
assertEquals(byteResult, testResult, 0);
}
@@ -293,10 +293,10 @@ public class ReferenceServiceTestCase {
int testInt = 10;
mboxReference.sendArgs(testInt, testString);
assertEquals(testInt, ((OtpErlangLong) ((OtpErlangTuple) mboxListener
- .getMsg().getMsg()).elementAt(0)).longValue());
+ .getMsg()).elementAt(0)).longValue());
assertEquals(testString,
- ((OtpErlangString) ((OtpErlangTuple) mboxListener.getMsg()
- .getMsg()).elementAt(1)).stringValue());
+ ((OtpErlangString) ((OtpErlangTuple) mboxListener.getMsg())
+ .elementAt(1)).stringValue());
}
/**
@@ -321,8 +321,7 @@ public class ReferenceServiceTestCase {
testArg.arg1.arg2 = "Arg2b";
StructuredTuple testResult = mboxReference.sendArgs(testArg);
assertEquals(tupleResult, testResult);
- OtpErlangTuple received = (OtpErlangTuple) mboxListener.getMsg()
- .getMsg();
+ OtpErlangTuple received = (OtpErlangTuple) mboxListener.getMsg();
assertEquals(testArg.arg1.arg1,
((OtpErlangLong) ((OtpErlangTuple) received.elementAt(0))
.elementAt(0)).longValue());
@@ -351,8 +350,7 @@ public class ReferenceServiceTestCase {
for (int i = 0; i < testArg.length; i++) {
assertEquals(testArg[i], testResult[i]);
}
- OtpErlangBinary received = (OtpErlangBinary) mboxListener.getMsg()
- .getMsg();
+ OtpErlangBinary received = (OtpErlangBinary) mboxListener.getMsg();
assertEquals(testArg.length, received.size());
for (int i = 0; i < testArg.length; i++) {
assertEquals(testArg[i], received.binaryValue()[i]);
@@ -375,7 +373,7 @@ public class ReferenceServiceTestCase {
for (int i = 0; i < testArg.length; i++) {
assertEquals(testArg[i], testResult[i]);
}
- OtpErlangList received = (OtpErlangList) mboxListener.getMsg().getMsg();
+ OtpErlangList received = (OtpErlangList) mboxListener.getMsg();
assertEquals(testArg.length, received.arity());
for (int i = 0; i < testArg.length; i++) {
assertEquals(testArg[i], ((OtpErlangString) received.elementAt(i))
@@ -402,7 +400,7 @@ public class ReferenceServiceTestCase {
assertEquals(testArg[i][j], testResult[i][j]);
}
}
- OtpErlangList received = (OtpErlangList) mboxListener.getMsg().getMsg();
+ OtpErlangList received = (OtpErlangList) mboxListener.getMsg();
assertEquals(testArg.length, received.arity());
for (int i = 0; i < testArg.length; i++) {
for (int j = 0; j < testArg[i].length; j++) {
@@ -631,19 +629,22 @@ public class ReferenceServiceTestCase {
assertEquals(ErlangException.class, e.getClass());
}
}
-
+
/**
* Tests mbox with retrieving and answering with basic arguments
*
* @throws Exception
*/
- @Test(timeout = 1000)
+ @Test(timeout = 2000)
public void testMbox() throws Exception {
OtpErlangObject[] args = new OtpErlangObject[2];
args[0] = new OtpErlangString("world");
args[1] = new OtpErlangString("!");
OtpErlangTuple tuple = new OtpErlangTuple(args);
- refMbox.send("sayHello", "RPCServerMbox", tuple);
+ OtpErlangObject[] argsWithSender = new OtpErlangObject[2];
+ argsWithSender[0] = refMbox.self();
+ argsWithSender[1] = tuple;
+ refMbox.send("sayHello", "RPCServerMbox", new OtpErlangTuple(argsWithSender));
OtpErlangString result = (OtpErlangString) refMbox.receiveMsg()
.getMsg();
assertEquals("Hello world !", result.stringValue());
@@ -679,7 +680,10 @@ public class ReferenceServiceTestCase {
argsContent[0] = structuredTuple;
argsContent[1] = list;
OtpErlangTuple args = new OtpErlangTuple(argsContent);
- refMbox.send("passComplexArgs", "RPCServerMbox", args);
+ OtpErlangObject[] withSender = new OtpErlangObject[2];
+ withSender[0] = refMbox.self();
+ withSender[1] = args;
+ refMbox.send("passComplexArgs", "RPCServerMbox", new OtpErlangTuple(withSender));
OtpErlangObject result = refMbox.receiveMsg().getMsg();
assertEquals(arg1,
((OtpErlangLong) ((OtpErlangTuple) ((OtpErlangTuple) result)
@@ -766,6 +770,7 @@ public class ReferenceServiceTestCase {
/**
* Tests timeout feature for service side bindings
+ *
* @throws Exception
*/
@Test(timeout = 4000)
@@ -811,5 +816,5 @@ public class ReferenceServiceTestCase {
cookieModuleReference.sayHellos();
}
-
+
}