diff options
author | wjaniszewski <wjaniszewski@13f79535-47bb-0310-9956-ffa450edef68> | 2009-03-20 14:23:51 +0000 |
---|---|---|
committer | wjaniszewski <wjaniszewski@13f79535-47bb-0310-9956-ffa450edef68> | 2009-03-20 14:23:51 +0000 |
commit | fce6ad4c30b15b4284a94e7a00e084bf18748cf2 (patch) | |
tree | a49756814cc819ecb6ca2ccecfedb3f9969b274e /sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java | |
parent | e962b3bbdff3995b6871393755ccd070b78aa329 (diff) |
Added configurable timeout and cookies for service bindings. Consequence is that one SCA-Erlang node is created excusively for one service binding, so service bindings cannot share the same value in 'node' attribute like it was before. Made some other fixes.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@756480 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java')
-rw-r--r-- | sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java | 92 |
1 files changed, 59 insertions, 33 deletions
diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java index a03d522860..001256f5f0 100644 --- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java +++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java @@ -35,6 +35,7 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService; import com.ericsson.otp.erlang.OtpAuthException; import com.ericsson.otp.erlang.OtpConnection; import com.ericsson.otp.erlang.OtpErlangAtom; +import com.ericsson.otp.erlang.OtpErlangDecodeException; import com.ericsson.otp.erlang.OtpErlangExit; import com.ericsson.otp.erlang.OtpErlangList; import com.ericsson.otp.erlang.OtpErlangObject; @@ -53,22 +54,18 @@ public class ServiceExecutor implements Runnable { private static final Logger logger = Logger.getLogger(ServiceExecutor.class .getName()); - private static final long RECEIVE_TIMEOUT = 60000; - private Map<String, ErlangNodeElement> erlangModules; - private ErlangNodeElement erlangMbox; + private ErlangNodeElement nodeElement; private OtpConnection connection; private Map<String, List<Operation>> groupedOperations; private String name; public ServiceExecutor(OtpConnection connection, Map<String, List<Operation>> groupedOperations, - Map<String, ErlangNodeElement> erlangModules, - ErlangNodeElement erlangMbox, String name) { - this.erlangModules = erlangModules; + ErlangNodeElement nodeElement, String name) { this.connection = connection; this.groupedOperations = groupedOperations; - this.erlangMbox = erlangMbox; + this.nodeElement = nodeElement; this.name = name; } @@ -82,6 +79,10 @@ public class ServiceExecutor implements Runnable { connection.send(pid, msg); } + private String getResponseClientNodeName(OtpErlangPid pid) { + return "_response_connector_to_" + pid + System.currentTimeMillis(); + } + private void handleRpc(OtpMsg msg) { OtpErlangTuple request = null; OtpErlangPid senderPid = null; @@ -103,7 +104,7 @@ public class ServiceExecutor implements Runnable { } else { argsList = new OtpErlangList(args); } - if (!erlangModules.containsKey(module)) { + if (!nodeElement.getBinding().getModule().equals(module)) { // TODO: externalize message? OtpErlangObject errorMsg = MessageHelper.functionUndefMessage( module, function, argsList, @@ -111,9 +112,8 @@ public class ServiceExecutor implements Runnable { sendMessage(connection, senderPid, senderRef, MessageHelper.ATOM_BADRPC, errorMsg); } else { - RuntimeComponentService service = erlangModules.get(module) - .getService(); - ErlangBinding binding = erlangModules.get(module).getBinding(); + RuntimeComponentService service = nodeElement.getService(); + ErlangBinding binding = nodeElement.getBinding(); List<Operation> operations = service.getInterfaceContract() .getInterface().getOperations(); Operation operation = null; @@ -177,6 +177,18 @@ public class ServiceExecutor implements Runnable { MessageHelper.ATOM_BADRPC, errorMsg); } } + } catch (ClassCastException e) { + // TODO: externalize message? + try { + logger + .log( + Level.WARNING, + "On node '" + + nodeElement.getBinding().getNode() + + "' received RPC request which is invalid. Request content is: " + + msg.getMsg()); + } catch (OtpErlangDecodeException e1) { + } } catch (Exception e) { try { sendMessage(connection, senderPid, senderRef, @@ -184,11 +196,10 @@ public class ServiceExecutor implements Runnable { "Unhandled error while processing request: " + e.getClass().getCanonicalName() + ", message: " + e.getMessage())); - } catch (IOException e1) { + } catch (Exception e1) { // error while sending error message. Can't do anything now logger.log(Level.WARNING, "Error during sending error message", e); - e.printStackTrace(); } } } @@ -201,7 +212,7 @@ public class ServiceExecutor implements Runnable { if (operations == null) { // TODO: externalize message? // NOTE: I assume in Erlang sender doesn't get confirmation so - // message will be send + // no message will be send logger.log(Level.WARNING, "Node '" + name + "' received message addressed to non exising mbox: " + msg.getRecipientName()); @@ -224,8 +235,8 @@ public class ServiceExecutor implements Runnable { } if (matchedOperation != null) { try { - Object result = erlangMbox.getService().getRuntimeWire( - erlangMbox.getBinding()).invoke(matchedOperation, + Object result = nodeElement.getService().getRuntimeWire( + nodeElement.getBinding()).invoke(matchedOperation, args); OtpErlangObject response = null; if (matchedOperation.getOutputType() != null @@ -237,50 +248,65 @@ public class ServiceExecutor implements Runnable { response = TypeHelpersProxy.toErlang(arrArg); } if (response != null) { - OtpNode node = new OtpNode("_response_connector_to_" - + msg.getSenderPid()); + OtpNode node = new OtpNode( + getResponseClientNodeName(msg.getSenderPid())); OtpMbox mbox = node.createMbox(); mbox.send(msg.getSenderPid(), response); } } catch (InvocationTargetException e) { // FIXME: use linking feature? send some error? e.printStackTrace(); - } catch (IOException e) { + // } catch (IOException e) { + } catch (Exception e) { // FIXME: log this problem? use linking feature? send error? e.printStackTrace(); } } else { - // TODO: externalize message? - // NOTE: don't send error message if mapping not found - logger.log(Level.WARNING, "No mapping for such arguments in '" - + msg.getRecipientName() + "' operation in '" + name - + "' node."); + try { + // TODO: externalize message? + // NOTE: don't send error message if mapping not found + logger.log(Level.WARNING, + "No mapping for such arguments in '" + + msg.getRecipientName() + + "' operation in '" + name + + "' node. Recevied arguments: " + + msg.getMsg()); + } catch (OtpErlangDecodeException e) { + } } } } public void run() { try { - // TODO: should receive timeout be configured in .composite file? - OtpMsg msg = connection.receiveMsg(RECEIVE_TIMEOUT); - if (msg.getRecipientName().equals(MessageHelper.RPC_MBOX)) { + // NOTE: there's also a timeout, like in reference bindings + OtpMsg msg = null; + if (nodeElement.getBinding().hasTimeout()) { + msg = connection.receiveMsg(nodeElement.getBinding() + .getTimeout()); + } else { + msg = connection.receiveMsg(); + } + if (msg.getRecipientName().equals(MessageHelper.RPC_MBOX) + && !nodeElement.getBinding().isMbox()) { handleRpc(msg); - } else if (msg != null) { + } else if (!msg.getRecipientName().equals(MessageHelper.RPC_MBOX) + && nodeElement.getBinding().isMbox()) { handleMsg(msg); } else { - // message receive timeout + // received wrong message type } } catch (IOException e) { // TODO: externalize message? logger.log(Level.WARNING, "Problem while receiving message", e); } catch (OtpErlangExit e) { // TODO: linking? - e.printStackTrace(); } catch (OtpAuthException e) { - // TODO: cookies? + // TODO: cookies? does this exception occur sometime? } catch (InterruptedException e) { - // TODO: when it could happen? - e.printStackTrace(); + // NOTE: timeout will be logged + // TODO: externalize message? + logger.log(Level.WARNING, "Timeout while waiting for request", e); } finally { connection.close(); } |