From adcd82c520cafe5744c4256c245c3930955d3491 Mon Sep 17 00:00:00 2001 From: wjaniszewski Date: Mon, 13 Apr 2009 09:41:00 +0000 Subject: General improvements for messaging, fixed some issues regarding communication with real Erlang nodes git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@764382 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/binding/erlang/impl/ErlangInvoker.java | 15 +++-- .../sca/binding/erlang/impl/ServiceExecutor.java | 64 +++++++++++++--------- 2 files changed, 44 insertions(+), 35 deletions(-) (limited to 'branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org') diff --git a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java index 7cd23aa1b1..6750e292d9 100644 --- a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java +++ b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java @@ -81,9 +81,12 @@ public class ErlangInvoker implements Invoker { } tmpMbox = node.createMbox(); Object[] args = msg.getBody(); - OtpErlangObject msgPayload = TypeHelpersProxy.toErlang(args); + // create and send msg with self pid in the beginning + OtpErlangObject[] argsArray = { tmpMbox.self(), + TypeHelpersProxy.toErlang(args) }; + OtpErlangObject otpArgs = new OtpErlangTuple(argsArray); tmpMbox.send(msg.getOperation().getName(), binding.getNode(), - msgPayload); + otpArgs); if (msg.getOperation().getOutputType() != null) { OtpMsg resultMsg = null; if (binding.hasTimeout()) { @@ -149,12 +152,8 @@ public class ErlangInvoker implements Invoker { reportProblem(msg, e); msg.setBody(null); } else if (msg.getOperation().getOutputType() != null) { - if (result.getClass().equals(OtpErlangTuple.class)) { - OtpErlangObject resultBody = ((OtpErlangTuple) result) - .elementAt(1); - msg.setBody(TypeHelpersProxy.toJava(resultBody, msg - .getOperation().getOutputType().getPhysical())); - } + msg.setBody(TypeHelpersProxy.toJava(result, msg.getOperation() + .getOutputType().getPhysical())); } } catch (OtpAuthException e) { // TODO: externalize message? diff --git a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java index 001256f5f0..696447feef 100644 --- a/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java +++ b/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java @@ -43,9 +43,7 @@ import com.ericsson.otp.erlang.OtpErlangPid; import com.ericsson.otp.erlang.OtpErlangRef; import com.ericsson.otp.erlang.OtpErlangString; import com.ericsson.otp.erlang.OtpErlangTuple; -import com.ericsson.otp.erlang.OtpMbox; import com.ericsson.otp.erlang.OtpMsg; -import com.ericsson.otp.erlang.OtpNode; /** * @version $Rev$ $Date$ @@ -72,17 +70,18 @@ public class ServiceExecutor implements Runnable { private void sendMessage(OtpConnection connection, OtpErlangPid pid, OtpErlangRef ref, OtpErlangAtom head, OtpErlangObject message) throws IOException { - OtpErlangTuple tResult = new OtpErlangTuple(new OtpErlangObject[] { - head, message }); + OtpErlangObject tResult = null; + if (head != null) { + tResult = new OtpErlangTuple( + new OtpErlangObject[] { head, message }); + } else { + tResult = message; + } OtpErlangObject msg = null; msg = new OtpErlangTuple(new OtpErlangObject[] { ref, tResult }); connection.send(pid, msg); } - private String getResponseClientNodeName(OtpErlangPid pid) { - return "_response_connector_to_" + pid + System.currentTimeMillis(); - } - private void handleRpc(OtpMsg msg) { OtpErlangTuple request = null; OtpErlangPid senderPid = null; @@ -148,8 +147,8 @@ public class ServiceExecutor implements Runnable { Object[] arrArg = new Object[] { result }; response = TypeHelpersProxy.toErlang(arrArg); } - sendMessage(connection, senderPid, senderRef, - MessageHelper.ATOM_OK, response); + sendMessage(connection, senderPid, senderRef, null, + response); } catch (Exception e) { if ((e.getClass().equals( InvocationTargetException.class) && e @@ -207,8 +206,24 @@ public class ServiceExecutor implements Runnable { private void handleMsg(OtpMsg msg) { Operation matchedOperation = null; Object args[] = null; + OtpErlangPid senderPid = null; + OtpErlangObject msgNoSender = null; List operations = groupedOperations.get(msg .getRecipientName()); + try { + if (msg.getMsg().getClass().equals(OtpErlangTuple.class) + && (((OtpErlangTuple) msg.getMsg()).elementAt(0)) + .getClass().equals(OtpErlangPid.class)) { + senderPid = (OtpErlangPid) ((OtpErlangTuple) msg.getMsg()) + .elementAt(0); + msgNoSender = ((OtpErlangTuple) msg.getMsg()).elementAt(1); + } else { + msgNoSender = msg.getMsg(); + } + } catch (Exception e) { + + } + if (operations == null) { // TODO: externalize message? // NOTE: I assume in Erlang sender doesn't get confirmation so @@ -224,7 +239,7 @@ public class ServiceExecutor implements Runnable { forClasses[i] = iTypes.get(i).getPhysical(); } try { - args = TypeHelpersProxy.toJavaAsArgs(msg.getMsg(), + args = TypeHelpersProxy.toJavaAsArgs(msgNoSender, forClasses); matchedOperation = operation; break; @@ -247,11 +262,12 @@ public class ServiceExecutor implements Runnable { Object[] arrArg = new Object[] { result }; response = TypeHelpersProxy.toErlang(arrArg); } - if (response != null) { - OtpNode node = new OtpNode( - getResponseClientNodeName(msg.getSenderPid())); - OtpMbox mbox = node.createMbox(); - mbox.send(msg.getSenderPid(), response); + if (response != null && senderPid != null) { + connection.send(senderPid, response); + } else if (response != null && senderPid == null) { + // FIXME: cannot send reply - sender didn't provided + // pid. Use PID obtained by jinteface or log this error? + // connection.send(msg.getSenderPid(), response); } } catch (InvocationTargetException e) { // FIXME: use linking feature? send some error? @@ -262,17 +278,11 @@ public class ServiceExecutor implements Runnable { e.printStackTrace(); } } else { - try { - // TODO: externalize message? - // NOTE: don't send error message if mapping not found - logger.log(Level.WARNING, - "No mapping for such arguments in '" - + msg.getRecipientName() - + "' operation in '" + name - + "' node. Recevied arguments: " - + msg.getMsg()); - } catch (OtpErlangDecodeException e) { - } + // TODO: externalize message? + // NOTE: don't send error message if mapping not found + logger.log(Level.WARNING, "No mapping for such arguments in '" + + msg.getRecipientName() + "' operation in '" + name + + "' node. Recevied arguments: " + msgNoSender); } } } -- cgit v1.2.3