summaryrefslogtreecommitdiffstats
path: root/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java
diff options
context:
space:
mode:
Diffstat (limited to 'sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java')
-rw-r--r--sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java91
1 files changed, 67 insertions, 24 deletions
diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java
index 9ed3713db4..9a540fd5bb 100644
--- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java
+++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java
@@ -19,12 +19,16 @@
package org.apache.tuscany.sca.binding.erlang.impl;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.tuscany.sca.binding.erlang.ErlangBinding;
import org.apache.tuscany.sca.binding.erlang.impl.exceptions.ErlangException;
import org.apache.tuscany.sca.binding.erlang.impl.types.TypeHelpersProxy;
import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.Message;
+import com.ericsson.otp.erlang.OtpAuthException;
import com.ericsson.otp.erlang.OtpConnection;
import com.ericsson.otp.erlang.OtpEpmd;
import com.ericsson.otp.erlang.OtpErlangList;
@@ -41,32 +45,63 @@ import com.ericsson.otp.erlang.OtpSelf;
*/
public class ErlangInvoker implements Invoker {
+ private static final Logger logger = Logger.getLogger(ErlangInvoker.class
+ .getName());
+
private ErlangBinding binding;
- private OtpNode node;
public ErlangInvoker(ErlangBinding binding) {
this.binding = binding;
}
+ private void reportProblem(Message msg, Exception e) {
+ if (msg.getOperation().getFaultTypes().size() > 0) {
+ msg.setFaultBody(e);
+ } else {
+ // NOTE: don't throw exception if not declared
+ // TODO: externalize message?
+ logger
+ .log(Level.WARNING, "Problem while sending/receiving data",
+ e);
+ }
+ }
+
+ private boolean isCookieProvided() throws Exception {
+ return binding.getCookie() != null && binding.getCookie().length() > 0;
+ }
+
+ private String getClientNodeName() {
+ return "_connector_to_" + binding.getNode()
+ + System.currentTimeMillis();
+ }
+
private Message sendMessage(Message msg) {
OtpMbox tmpMbox = null;
+ OtpNode node = null;
try {
- String nodeName = "_connector_to_" + binding.getNode();
- node = new OtpNode(nodeName);
+ node = new OtpNode(getClientNodeName());
+ if (isCookieProvided()) {
+ node.setCookie(binding.getCookie());
+ }
tmpMbox = node.createMbox();
Object[] args = msg.getBody();
OtpErlangObject msgPayload = TypeHelpersProxy.toErlang(args);
tmpMbox.send(msg.getOperation().getName(), binding.getNode(),
msgPayload);
if (msg.getOperation().getOutputType() != null) {
- // TODO: add timeouts, timeout declaration method?
- OtpMsg resultMsg = tmpMbox.receiveMsg();
+ OtpMsg resultMsg = tmpMbox.receiveMsg(binding.getTimeout());
OtpErlangObject result = resultMsg.getMsg();
msg.setBody(TypeHelpersProxy.toJava(result, msg.getOperation()
.getOutputType().getPhysical()));
}
+ } catch (InterruptedException e) {
+ // TODO: externalize message?
+ ErlangException ee = new ErlangException(
+ "Timeout while receiving message reply", e);
+ msg.setBody(null);
+ reportProblem(msg, ee);
} catch (Exception e) {
- msg.setFaultBody(e);
+ reportProblem(msg, e);
} finally {
if (tmpMbox != null) {
tmpMbox.close();
@@ -84,8 +119,10 @@ public class ErlangInvoker implements Invoker {
OtpPeer other = null;
OtpConnection connection = null;
try {
- String nodeName = "_connector_to_" + binding.getNode();
- self = new OtpSelf(nodeName);
+ self = new OtpSelf(getClientNodeName());
+ if (isCookieProvided()) {
+ self.setCookie(binding.getCookie());
+ }
other = new OtpPeer(binding.getNode());
connection = self.connect(other);
OtpErlangList params = TypeHelpersProxy
@@ -94,23 +131,18 @@ public class ErlangInvoker implements Invoker {
.createRef(), binding.getModule(), msg.getOperation()
.getName(), params);
connection.send(MessageHelper.RPC_MBOX, message);
- OtpErlangObject result = connection.receiveRPC();
+ OtpErlangObject rpcResponse = connection.receive(binding
+ .getTimeout());
+ OtpErlangObject result = ((OtpErlangTuple) rpcResponse)
+ .elementAt(1);
if (MessageHelper.isfunctionUndefMessage(result)) {
// TODO: externalize message?
- Exception e = new ErlangException(
- "No such function in referenced Erlang node.");
- if (msg.getOperation().getFaultTypes().size() == 0) {
- // TODO: no way to throw exception, log it (temporary as
- // System.out)
- // TODO: do we really want not to throw any exception?
- System.out.println("PROBLEM: " + e.getMessage());
- // in this case we don't throw occured problem, so we need
- // to reset message body (if body is not cleared then
- // operation arguments would be set as operation output)
- msg.setBody(null);
- } else {
- msg.setFaultBody(e);
- }
+ Exception e = new ErlangException("No '" + binding.getModule()
+ + ":" + msg.getOperation().getName()
+ + "' operation defined on remote '" + binding.getNode()
+ + "' node.");
+ reportProblem(msg, e);
+ msg.setBody(null);
} else if (msg.getOperation().getOutputType() != null) {
if (result.getClass().equals(OtpErlangTuple.class)) {
OtpErlangObject resultBody = ((OtpErlangTuple) result)
@@ -119,8 +151,19 @@ public class ErlangInvoker implements Invoker {
.getOperation().getOutputType().getPhysical()));
}
}
+ } catch (OtpAuthException e) {
+ // TODO: externalize message?
+ ErlangException ee = new ErlangException("Problem while authenticating client - check your cookie", e);
+ msg.setBody(null);
+ reportProblem(msg, ee);
+ } catch (InterruptedException e) {
+ // TODO: externalize message?
+ ErlangException ee = new ErlangException(
+ "Timeout while receiving RPC reply", e);
+ msg.setBody(null);
+ reportProblem(msg, ee);
} catch (Exception e) {
- msg.setFaultBody(e);
+ reportProblem(msg, e);
} finally {
if (connection != null) {
connection.close();