summaryrefslogtreecommitdiffstats
path: root/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java
diff options
context:
space:
mode:
Diffstat (limited to 'sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java')
-rw-r--r--sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java92
1 files changed, 59 insertions, 33 deletions
diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java
index a03d522860..001256f5f0 100644
--- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java
+++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java
@@ -35,6 +35,7 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import com.ericsson.otp.erlang.OtpAuthException;
import com.ericsson.otp.erlang.OtpConnection;
import com.ericsson.otp.erlang.OtpErlangAtom;
+import com.ericsson.otp.erlang.OtpErlangDecodeException;
import com.ericsson.otp.erlang.OtpErlangExit;
import com.ericsson.otp.erlang.OtpErlangList;
import com.ericsson.otp.erlang.OtpErlangObject;
@@ -53,22 +54,18 @@ public class ServiceExecutor implements Runnable {
private static final Logger logger = Logger.getLogger(ServiceExecutor.class
.getName());
- private static final long RECEIVE_TIMEOUT = 60000;
- private Map<String, ErlangNodeElement> erlangModules;
- private ErlangNodeElement erlangMbox;
+ private ErlangNodeElement nodeElement;
private OtpConnection connection;
private Map<String, List<Operation>> groupedOperations;
private String name;
public ServiceExecutor(OtpConnection connection,
Map<String, List<Operation>> groupedOperations,
- Map<String, ErlangNodeElement> erlangModules,
- ErlangNodeElement erlangMbox, String name) {
- this.erlangModules = erlangModules;
+ ErlangNodeElement nodeElement, String name) {
this.connection = connection;
this.groupedOperations = groupedOperations;
- this.erlangMbox = erlangMbox;
+ this.nodeElement = nodeElement;
this.name = name;
}
@@ -82,6 +79,10 @@ public class ServiceExecutor implements Runnable {
connection.send(pid, msg);
}
+ private String getResponseClientNodeName(OtpErlangPid pid) {
+ return "_response_connector_to_" + pid + System.currentTimeMillis();
+ }
+
private void handleRpc(OtpMsg msg) {
OtpErlangTuple request = null;
OtpErlangPid senderPid = null;
@@ -103,7 +104,7 @@ public class ServiceExecutor implements Runnable {
} else {
argsList = new OtpErlangList(args);
}
- if (!erlangModules.containsKey(module)) {
+ if (!nodeElement.getBinding().getModule().equals(module)) {
// TODO: externalize message?
OtpErlangObject errorMsg = MessageHelper.functionUndefMessage(
module, function, argsList,
@@ -111,9 +112,8 @@ public class ServiceExecutor implements Runnable {
sendMessage(connection, senderPid, senderRef,
MessageHelper.ATOM_BADRPC, errorMsg);
} else {
- RuntimeComponentService service = erlangModules.get(module)
- .getService();
- ErlangBinding binding = erlangModules.get(module).getBinding();
+ RuntimeComponentService service = nodeElement.getService();
+ ErlangBinding binding = nodeElement.getBinding();
List<Operation> operations = service.getInterfaceContract()
.getInterface().getOperations();
Operation operation = null;
@@ -177,6 +177,18 @@ public class ServiceExecutor implements Runnable {
MessageHelper.ATOM_BADRPC, errorMsg);
}
}
+ } catch (ClassCastException e) {
+ // TODO: externalize message?
+ try {
+ logger
+ .log(
+ Level.WARNING,
+ "On node '"
+ + nodeElement.getBinding().getNode()
+ + "' received RPC request which is invalid. Request content is: "
+ + msg.getMsg());
+ } catch (OtpErlangDecodeException e1) {
+ }
} catch (Exception e) {
try {
sendMessage(connection, senderPid, senderRef,
@@ -184,11 +196,10 @@ public class ServiceExecutor implements Runnable {
"Unhandled error while processing request: "
+ e.getClass().getCanonicalName()
+ ", message: " + e.getMessage()));
- } catch (IOException e1) {
+ } catch (Exception e1) {
// error while sending error message. Can't do anything now
logger.log(Level.WARNING, "Error during sending error message",
e);
- e.printStackTrace();
}
}
}
@@ -201,7 +212,7 @@ public class ServiceExecutor implements Runnable {
if (operations == null) {
// TODO: externalize message?
// NOTE: I assume in Erlang sender doesn't get confirmation so
- // message will be send
+ // no message will be send
logger.log(Level.WARNING, "Node '" + name
+ "' received message addressed to non exising mbox: "
+ msg.getRecipientName());
@@ -224,8 +235,8 @@ public class ServiceExecutor implements Runnable {
}
if (matchedOperation != null) {
try {
- Object result = erlangMbox.getService().getRuntimeWire(
- erlangMbox.getBinding()).invoke(matchedOperation,
+ Object result = nodeElement.getService().getRuntimeWire(
+ nodeElement.getBinding()).invoke(matchedOperation,
args);
OtpErlangObject response = null;
if (matchedOperation.getOutputType() != null
@@ -237,50 +248,65 @@ public class ServiceExecutor implements Runnable {
response = TypeHelpersProxy.toErlang(arrArg);
}
if (response != null) {
- OtpNode node = new OtpNode("_response_connector_to_"
- + msg.getSenderPid());
+ OtpNode node = new OtpNode(
+ getResponseClientNodeName(msg.getSenderPid()));
OtpMbox mbox = node.createMbox();
mbox.send(msg.getSenderPid(), response);
}
} catch (InvocationTargetException e) {
// FIXME: use linking feature? send some error?
e.printStackTrace();
- } catch (IOException e) {
+ // } catch (IOException e) {
+ } catch (Exception e) {
// FIXME: log this problem? use linking feature? send error?
e.printStackTrace();
}
} else {
- // TODO: externalize message?
- // NOTE: don't send error message if mapping not found
- logger.log(Level.WARNING, "No mapping for such arguments in '"
- + msg.getRecipientName() + "' operation in '" + name
- + "' node.");
+ try {
+ // TODO: externalize message?
+ // NOTE: don't send error message if mapping not found
+ logger.log(Level.WARNING,
+ "No mapping for such arguments in '"
+ + msg.getRecipientName()
+ + "' operation in '" + name
+ + "' node. Recevied arguments: "
+ + msg.getMsg());
+ } catch (OtpErlangDecodeException e) {
+ }
}
}
}
public void run() {
try {
- // TODO: should receive timeout be configured in .composite file?
- OtpMsg msg = connection.receiveMsg(RECEIVE_TIMEOUT);
- if (msg.getRecipientName().equals(MessageHelper.RPC_MBOX)) {
+ // NOTE: there's also a timeout, like in reference bindings
+ OtpMsg msg = null;
+ if (nodeElement.getBinding().hasTimeout()) {
+ msg = connection.receiveMsg(nodeElement.getBinding()
+ .getTimeout());
+ } else {
+ msg = connection.receiveMsg();
+ }
+ if (msg.getRecipientName().equals(MessageHelper.RPC_MBOX)
+ && !nodeElement.getBinding().isMbox()) {
handleRpc(msg);
- } else if (msg != null) {
+ } else if (!msg.getRecipientName().equals(MessageHelper.RPC_MBOX)
+ && nodeElement.getBinding().isMbox()) {
handleMsg(msg);
} else {
- // message receive timeout
+ // received wrong message type
}
} catch (IOException e) {
// TODO: externalize message?
logger.log(Level.WARNING, "Problem while receiving message", e);
} catch (OtpErlangExit e) {
// TODO: linking?
- e.printStackTrace();
} catch (OtpAuthException e) {
- // TODO: cookies?
+ // TODO: cookies? does this exception occur sometime?
} catch (InterruptedException e) {
- // TODO: when it could happen?
- e.printStackTrace();
+ // NOTE: timeout will be logged
+ // TODO: externalize message?
+ logger.log(Level.WARNING, "Timeout while waiting for request", e);
} finally {
connection.close();
}