summaryrefslogtreecommitdiffstats
path: root/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/RpcExecutor.java
diff options
context:
space:
mode:
authorwjaniszewski <wjaniszewski@13f79535-47bb-0310-9956-ffa450edef68>2009-03-15 11:19:14 +0000
committerwjaniszewski <wjaniszewski@13f79535-47bb-0310-9956-ffa450edef68>2009-03-15 11:19:14 +0000
commitfdfa3d0df9dc79314411bf455cc71c5677925e1b (patch)
tree408538346626c7b78bd13a4092ae323362ebe124 /sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/RpcExecutor.java
parentb44951427248e4cf764e2533115272ea17eb9b5f (diff)
Enabled exposing SCA components as Erlang message boxes.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@754654 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to '')
-rw-r--r--sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java (renamed from sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/RpcExecutor.java)134
1 files changed, 110 insertions, 24 deletions
diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/RpcExecutor.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java
index 03e83cec61..63c58cb696 100644
--- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/RpcExecutor.java
+++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java
@@ -38,22 +38,30 @@ import com.ericsson.otp.erlang.OtpErlangPid;
import com.ericsson.otp.erlang.OtpErlangRef;
import com.ericsson.otp.erlang.OtpErlangString;
import com.ericsson.otp.erlang.OtpErlangTuple;
+import com.ericsson.otp.erlang.OtpMbox;
+import com.ericsson.otp.erlang.OtpMsg;
+import com.ericsson.otp.erlang.OtpNode;
-public class RpcExecutor implements Runnable {
+/**
+ * @version $Rev$ $Date$
+ */
+public class ServiceExecutor implements Runnable {
- private Map<String, RuntimeComponentService> services;
- private Map<String, ErlangBinding> bindings;
- private OtpConnection connection;
+ private static final long RECEIVE_TIMEOUT = 60000;
- private static final OtpErlangAtom OK = new OtpErlangAtom("ok");
- private static final OtpErlangAtom ERROR = new OtpErlangAtom("error");
- private static final OtpErlangAtom BADRPC = new OtpErlangAtom("badrpc");
+ private Map<String, ErlangNodeElement> erlangModules;
+ private ErlangNodeElement erlangMbox;
+ private OtpConnection connection;
+ private Map<String, List<Operation>> groupedOperations;
- public RpcExecutor(Map<String, RuntimeComponentService> services, Map<String, ErlangBinding> bindings,
- OtpConnection connection) {
- this.bindings = bindings;
- this.services = services;
+ public ServiceExecutor(OtpConnection connection,
+ Map<String, List<Operation>> groupedOperations,
+ Map<String, ErlangNodeElement> erlangModules,
+ ErlangNodeElement erlangMbox) {
+ this.erlangModules = erlangModules;
this.connection = connection;
+ this.groupedOperations = groupedOperations;
+ this.erlangMbox = erlangMbox;
}
private void sendMessage(OtpConnection connection, OtpErlangPid pid,
@@ -66,12 +74,12 @@ public class RpcExecutor implements Runnable {
connection.send(pid, msg);
}
- public void run() {
+ private void handleRpc(OtpMsg msg) {
OtpErlangTuple request = null;
OtpErlangPid senderPid = null;
OtpErlangRef senderRef = null;
try {
- OtpErlangTuple call = (OtpErlangTuple) connection.receive();
+ OtpErlangTuple call = (OtpErlangTuple) msg.getMsg();
OtpErlangTuple from = (OtpErlangTuple) call.elementAt(1);
request = (OtpErlangTuple) call.elementAt(2);
senderPid = (OtpErlangPid) from.elementAt(0);
@@ -81,20 +89,23 @@ public class RpcExecutor implements Runnable {
.atomValue();
OtpErlangObject args = request.elementAt(3);
OtpErlangList argsList = null;
+ // normalize input
if (args instanceof OtpErlangList) {
argsList = (OtpErlangList) args;
} else {
argsList = new OtpErlangList(args);
}
- if (!services.containsKey(module)) {
+ if (!erlangModules.containsKey(module)) {
// TODO: externalize message?
OtpErlangObject errorMsg = MessageHelper.functionUndefMessage(
module, function, argsList,
"Module not found in SCA component.");
- sendMessage(connection, senderPid, senderRef, BADRPC, errorMsg);
+ sendMessage(connection, senderPid, senderRef,
+ MessageHelper.ATOM_BADRPC, errorMsg);
} else {
- RuntimeComponentService service = services.get(module);
- ErlangBinding binding = bindings.get(module);
+ RuntimeComponentService service = erlangModules.get(module)
+ .getService();
+ ErlangBinding binding = erlangModules.get(module).getBinding();
List<Operation> operations = service.getInterfaceContract()
.getInterface().getOperations();
Operation operation = null;
@@ -128,8 +139,8 @@ public class RpcExecutor implements Runnable {
Object[] arrArg = new Object[] { result };
response = TypeHelpersProxy.toErlang(arrArg);
}
- sendMessage(connection, senderPid, senderRef, OK,
- response);
+ sendMessage(connection, senderPid, senderRef,
+ MessageHelper.ATOM_OK, response);
} catch (Exception e) {
if ((e.getClass().equals(
InvocationTargetException.class) && e
@@ -143,7 +154,7 @@ public class RpcExecutor implements Runnable {
argsList,
"Operation name found in SCA component, but parameters types didn't match.");
sendMessage(connection, senderPid, senderRef,
- BADRPC, errorMsg);
+ MessageHelper.ATOM_BADRPC, errorMsg);
} else {
throw e;
}
@@ -153,22 +164,97 @@ public class RpcExecutor implements Runnable {
OtpErlangObject errorMsg = MessageHelper
.functionUndefMessage(module, function, argsList,
"Operation name not found in SCA component.");
- sendMessage(connection, senderPid, senderRef, BADRPC,
- errorMsg);
+ sendMessage(connection, senderPid, senderRef,
+ MessageHelper.ATOM_BADRPC, errorMsg);
}
}
} catch (Exception e) {
// TODO: distinguish and describe errors!
try {
e.printStackTrace();
- sendMessage(connection, senderPid, senderRef, ERROR,
- new OtpErlangString(
+ sendMessage(connection, senderPid, senderRef,
+ MessageHelper.ATOM_ERROR, new OtpErlangString(
"Unhandled error while processing request: "
+ e.getClass().getCanonicalName()
+ ", message: " + e.getMessage()));
} catch (IOException e1) {
// error while sending error message. Can't do anything now
}
+ }
+ }
+
+ private void handleMsg(OtpMsg msg) {
+ Operation matchedOperation = null;
+ Object args[] = null;
+ List<Operation> operations = groupedOperations.get(msg
+ .getRecipientName());
+ if (operations == null) {
+ // TODO: no such mbox, send error message?
+ } else {
+ for (Operation operation : operations) {
+ List<DataType> iTypes = operation.getInputType().getLogical();
+ Class<?>[] forClasses = new Class<?>[iTypes.size()];
+ for (int i = 0; i < iTypes.size(); i++) {
+ forClasses[i] = iTypes.get(i).getPhysical();
+ }
+ try {
+ args = TypeHelpersProxy.toJavaAsArgs(msg.getMsg(),
+ forClasses);
+ matchedOperation = operation;
+ break;
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ if (matchedOperation != null) {
+ try {
+ Object result = erlangMbox.getService().getRuntimeWire(
+ erlangMbox.getBinding()).invoke(matchedOperation,
+ args);
+ OtpErlangObject response = null;
+ if (matchedOperation.getOutputType() != null
+ && matchedOperation.getOutputType().getPhysical()
+ .isArray()) {
+ response = TypeHelpersProxy.toErlangAsList(result);
+ } else if (matchedOperation.getOutputType() != null) {
+ Object[] arrArg = new Object[] { result };
+ response = TypeHelpersProxy.toErlang(arrArg);
+ }
+ if (response != null) {
+ OtpNode node = new OtpNode("_response_connector_to_"
+ + msg.getSenderPid());
+ OtpMbox mbox = node.createMbox();
+ mbox.send(msg.getSenderPid(), response);
+ }
+ } catch (InvocationTargetException e) {
+ // TODO send some error?
+ e.printStackTrace();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ } else {
+ // TODO: send some error - no mapping for such arguments
+ System.out
+ .println("TODO: send some error - no mapping for such arguments");
+ }
+ }
+ }
+
+ public void run() {
+ try {
+ OtpMsg msg = connection.receiveMsg(RECEIVE_TIMEOUT);
+ if (msg.getRecipientName().equals(MessageHelper.RPC_MBOX)) {
+ handleRpc(msg);
+ } else if (msg != null) {
+ handleMsg(msg);
+ } else {
+ // message receive timeout
+ }
+ } catch (Exception e) {
+ // TODO: log, send error?
+ e.printStackTrace();
} finally {
connection.close();
}