diff options
Diffstat (limited to '')
6 files changed, 130 insertions, 146 deletions
diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangBindingProviderFactory.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangBindingProviderFactory.java index d01d266332..f7ea057e56 100644 --- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangBindingProviderFactory.java +++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangBindingProviderFactory.java @@ -19,8 +19,8 @@ package org.apache.tuscany.sca.binding.erlang.impl;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -39,9 +39,10 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService; public class ErlangBindingProviderFactory implements
BindingProviderFactory<ErlangBinding> {
- private Map<String, ErlangNode> nodes = new HashMap<String, ErlangNode>();
private static final Logger logger = Logger
.getLogger(ErlangBindingProviderFactory.class.getName());
+
+ private Set<String> nodes = new HashSet<String>();
public ErlangBindingProviderFactory(ExtensionPointRegistry registry) {
@@ -68,10 +69,18 @@ public class ErlangBindingProviderFactory implements ErlangBinding binding) {
ServiceBindingProvider provider = null;
try {
- provider = new ErlangServiceBindingProvider(getErlangNode(binding
- .getNode()), binding, service);
+ if (nodes.contains(binding.getNode())) {
+ // TODO: externalize message?
+ logger.log(Level.WARNING,
+ "Node name '" + binding.getNode() + "' already registered. This service will not be spawned.");
+ } else {
+ provider = new ErlangServiceBindingProvider(binding, service);
+ nodes.add(binding.getNode());
+ }
} catch (Exception e) {
- logger.log(Level.WARNING, "Exception during creating ServiceBindingProvider", e);
+ // TODO: externalize message?
+ logger.log(Level.WARNING,
+ "Exception during creating ServiceBindingProvider", e);
}
return provider;
}
@@ -82,16 +91,4 @@ public class ErlangBindingProviderFactory implements public Class<ErlangBinding> getModelType() {
return ErlangBinding.class;
}
-
- private ErlangNode getErlangNode(String name) throws Exception {
- ErlangNode result = null;
- if (nodes.containsKey(name)) {
- result = nodes.get(name);
- } else {
- result = new ErlangNode(name);
- nodes.put(name, result);
- }
- return result;
- }
-
}
diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java index 9a540fd5bb..7cd23aa1b1 100644 --- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java +++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java @@ -66,10 +66,6 @@ public class ErlangInvoker implements Invoker { }
}
- private boolean isCookieProvided() throws Exception {
- return binding.getCookie() != null && binding.getCookie().length() > 0;
- }
-
private String getClientNodeName() {
return "_connector_to_" + binding.getNode()
+ System.currentTimeMillis();
@@ -80,7 +76,7 @@ public class ErlangInvoker implements Invoker { OtpNode node = null;
try {
node = new OtpNode(getClientNodeName());
- if (isCookieProvided()) {
+ if (binding.hasCookie()) {
node.setCookie(binding.getCookie());
}
tmpMbox = node.createMbox();
@@ -89,7 +85,12 @@ public class ErlangInvoker implements Invoker { tmpMbox.send(msg.getOperation().getName(), binding.getNode(),
msgPayload);
if (msg.getOperation().getOutputType() != null) {
- OtpMsg resultMsg = tmpMbox.receiveMsg(binding.getTimeout());
+ OtpMsg resultMsg = null;
+ if (binding.hasTimeout()) {
+ resultMsg = tmpMbox.receiveMsg(binding.getTimeout());
+ } else {
+ resultMsg = tmpMbox.receiveMsg();
+ }
OtpErlangObject result = resultMsg.getMsg();
msg.setBody(TypeHelpersProxy.toJava(result, msg.getOperation()
.getOutputType().getPhysical()));
@@ -120,7 +121,7 @@ public class ErlangInvoker implements Invoker { OtpConnection connection = null;
try {
self = new OtpSelf(getClientNodeName());
- if (isCookieProvided()) {
+ if (binding.hasCookie()) {
self.setCookie(binding.getCookie());
}
other = new OtpPeer(binding.getNode());
@@ -131,8 +132,12 @@ public class ErlangInvoker implements Invoker { .createRef(), binding.getModule(), msg.getOperation()
.getName(), params);
connection.send(MessageHelper.RPC_MBOX, message);
- OtpErlangObject rpcResponse = connection.receive(binding
- .getTimeout());
+ OtpErlangObject rpcResponse = null;
+ if (binding.hasTimeout()) {
+ rpcResponse = connection.receive(binding.getTimeout());
+ } else {
+ rpcResponse = connection.receive();
+ }
OtpErlangObject result = ((OtpErlangTuple) rpcResponse)
.elementAt(1);
if (MessageHelper.isfunctionUndefMessage(result)) {
@@ -153,7 +158,9 @@ public class ErlangInvoker implements Invoker { }
} catch (OtpAuthException e) {
// TODO: externalize message?
- ErlangException ee = new ErlangException("Problem while authenticating client - check your cookie", e);
+ ErlangException ee = new ErlangException(
+ "Problem while authenticating client - check your cookie",
+ e);
msg.setBody(null);
reportProblem(msg, ee);
} catch (InterruptedException e) {
diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangNode.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangNode.java index beeec9fffb..94bac31809 100644 --- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangNode.java +++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangNode.java @@ -46,16 +46,15 @@ public class ErlangNode implements Runnable { private static final Logger logger = Logger.getLogger(ErlangNode.class .getName()); - private Map<String, ErlangNodeElement> erlangModules = new HashMap<String, ErlangNodeElement>(); - private ErlangNodeElement erlangMbox; - private boolean mboxNode; + private ErlangNodeElement nodeElement; private String name; private OtpSelf self; private ExecutorService executors; private boolean stopRequested; private Map<String, List<Operation>> groupedOperations; - public ErlangNode(String name) throws Exception { + public ErlangNode(String name, ErlangBinding binding, + RuntimeComponentService service) throws Exception { this.name = name; self = new OtpSelf(name); boolean registered = self.publishPort(); @@ -64,9 +63,13 @@ public class ErlangNode implements Runnable { throw new ErlangException( "Problem with publishing service under epmd server."); } + if (binding.hasCookie()) { + self.setCookie(binding.getCookie()); + } + registerBinding(binding, service); } - private void stop() { + public void stop() { stopRequested = true; executors.shutdownNow(); } @@ -78,76 +81,38 @@ public class ErlangNode implements Runnable { try { OtpConnection connection = self.accept(); executors.execute(new ServiceExecutor(connection, - groupedOperations, erlangModules, erlangMbox, name)); + groupedOperations, nodeElement, name)); } catch (IOException e) { // TODO: externalzie message? logger.log(Level.WARNING, "Error occured while accepting connection on '" + name - + "' node"); + + "' node", e); } catch (OtpAuthException e) { - // TODO: log bad authentication attempt + // TODO: externalize message? + logger.log(Level.WARNING, "Error while authenticating client", e); } } } - public void registerBinding(ErlangBinding binding, + private void registerBinding(ErlangBinding binding, RuntimeComponentService service) throws ErlangException { if (binding.isMbox()) { - if (mboxNode) { - // TODO: externalize message? - // NOTE: if mbox registered more than once for node then - // exception will be thrown - throw new ErlangException("Node " + binding.getNode() - + " already defined as mbox node"); - } else { - List<Operation> operations = service.getInterfaceContract() - .getInterface().getOperations(); - groupedOperations = new HashMap<String, List<Operation>>(); - for (Operation operation : operations) { - List<Operation> operationsGroup = groupedOperations - .get(operation.getName()); - if (operationsGroup == null) { - operationsGroup = new ArrayList<Operation>(); - groupedOperations.put(operation.getName(), - operationsGroup); - } - operationsGroup.add(operation); - } - mboxNode = true; - erlangMbox = new ErlangNodeElement(); - erlangMbox.setService(service); - erlangMbox.setBinding(binding); - } - } else { - if (erlangModules.containsKey(binding.getModule())) { - // TODO: externalize message? - // NOTE: if the same module was registered more than once than - // exception will be thrown - throw new ErlangException("Module " + binding.getModule() - + " already defined under " + name - + " node. Duplicate module won't be started"); - } else { - if (erlangModules.size() == 0) { - // NOTE: Erlang node is managing it's thread by itself. Just noticing. - Thread selfThread = new Thread(this); - selfThread.start(); + List<Operation> operations = service.getInterfaceContract() + .getInterface().getOperations(); + groupedOperations = new HashMap<String, List<Operation>>(); + for (Operation operation : operations) { + List<Operation> operationsGroup = groupedOperations + .get(operation.getName()); + if (operationsGroup == null) { + operationsGroup = new ArrayList<Operation>(); + groupedOperations.put(operation.getName(), operationsGroup); } - ErlangNodeElement module = new ErlangNodeElement(); - module.setService(service); - module.setBinding(binding); - erlangModules.put(binding.getModule(), module); - } - } - } - - public void unregisterBinding(ErlangBinding binding) throws ErlangException { - if (erlangModules.containsKey(binding.getModule())) { - erlangModules.remove(binding.getModule()); - erlangModules.remove(binding.getModule()); - if (erlangModules.size() == 0) { - stop(); + operationsGroup.add(operation); } } + nodeElement = new ErlangNodeElement(); + nodeElement.setService(service); + nodeElement.setBinding(binding); } } diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangServiceBindingProvider.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangServiceBindingProvider.java index 38b9b197d6..c2546a4336 100644 --- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangServiceBindingProvider.java +++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangServiceBindingProvider.java @@ -32,13 +32,11 @@ public class ErlangServiceBindingProvider implements ServiceBindingProvider { private RuntimeComponentService service;
private ErlangNode node;
- private ErlangBinding binding;
- public ErlangServiceBindingProvider(ErlangNode node, ErlangBinding binding,
- RuntimeComponentService service) {
+ public ErlangServiceBindingProvider(ErlangBinding binding,
+ RuntimeComponentService service) throws Exception {
this.service = service;
- this.binding = binding;
- this.node = node;
+ this.node = new ErlangNode(binding.getNode(), binding, service);
}
/**
@@ -53,7 +51,8 @@ public class ErlangServiceBindingProvider implements ServiceBindingProvider { */
public void start() {
try {
- node.registerBinding(binding, service);
+ Thread thread = new Thread(node);
+ thread.start();
} catch (Exception e) {
throw new ServiceRuntimeException(e);
}
@@ -65,7 +64,7 @@ public class ErlangServiceBindingProvider implements ServiceBindingProvider { */
public void stop() {
try {
- node.unregisterBinding(binding);
+ node.stop();
} catch (Exception e) {
throw new ServiceRuntimeException(e);
}
diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java index a03d522860..001256f5f0 100644 --- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java +++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java @@ -35,6 +35,7 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService; import com.ericsson.otp.erlang.OtpAuthException; import com.ericsson.otp.erlang.OtpConnection; import com.ericsson.otp.erlang.OtpErlangAtom; +import com.ericsson.otp.erlang.OtpErlangDecodeException; import com.ericsson.otp.erlang.OtpErlangExit; import com.ericsson.otp.erlang.OtpErlangList; import com.ericsson.otp.erlang.OtpErlangObject; @@ -53,22 +54,18 @@ public class ServiceExecutor implements Runnable { private static final Logger logger = Logger.getLogger(ServiceExecutor.class .getName()); - private static final long RECEIVE_TIMEOUT = 60000; - private Map<String, ErlangNodeElement> erlangModules; - private ErlangNodeElement erlangMbox; + private ErlangNodeElement nodeElement; private OtpConnection connection; private Map<String, List<Operation>> groupedOperations; private String name; public ServiceExecutor(OtpConnection connection, Map<String, List<Operation>> groupedOperations, - Map<String, ErlangNodeElement> erlangModules, - ErlangNodeElement erlangMbox, String name) { - this.erlangModules = erlangModules; + ErlangNodeElement nodeElement, String name) { this.connection = connection; this.groupedOperations = groupedOperations; - this.erlangMbox = erlangMbox; + this.nodeElement = nodeElement; this.name = name; } @@ -82,6 +79,10 @@ public class ServiceExecutor implements Runnable { connection.send(pid, msg); } + private String getResponseClientNodeName(OtpErlangPid pid) { + return "_response_connector_to_" + pid + System.currentTimeMillis(); + } + private void handleRpc(OtpMsg msg) { OtpErlangTuple request = null; OtpErlangPid senderPid = null; @@ -103,7 +104,7 @@ public class ServiceExecutor implements Runnable { } else { argsList = new OtpErlangList(args); } - if (!erlangModules.containsKey(module)) { + if (!nodeElement.getBinding().getModule().equals(module)) { // TODO: externalize message? OtpErlangObject errorMsg = MessageHelper.functionUndefMessage( module, function, argsList, @@ -111,9 +112,8 @@ public class ServiceExecutor implements Runnable { sendMessage(connection, senderPid, senderRef, MessageHelper.ATOM_BADRPC, errorMsg); } else { - RuntimeComponentService service = erlangModules.get(module) - .getService(); - ErlangBinding binding = erlangModules.get(module).getBinding(); + RuntimeComponentService service = nodeElement.getService(); + ErlangBinding binding = nodeElement.getBinding(); List<Operation> operations = service.getInterfaceContract() .getInterface().getOperations(); Operation operation = null; @@ -177,6 +177,18 @@ public class ServiceExecutor implements Runnable { MessageHelper.ATOM_BADRPC, errorMsg); } } + } catch (ClassCastException e) { + // TODO: externalize message? + try { + logger + .log( + Level.WARNING, + "On node '" + + nodeElement.getBinding().getNode() + + "' received RPC request which is invalid. Request content is: " + + msg.getMsg()); + } catch (OtpErlangDecodeException e1) { + } } catch (Exception e) { try { sendMessage(connection, senderPid, senderRef, @@ -184,11 +196,10 @@ public class ServiceExecutor implements Runnable { "Unhandled error while processing request: " + e.getClass().getCanonicalName() + ", message: " + e.getMessage())); - } catch (IOException e1) { + } catch (Exception e1) { // error while sending error message. Can't do anything now logger.log(Level.WARNING, "Error during sending error message", e); - e.printStackTrace(); } } } @@ -201,7 +212,7 @@ public class ServiceExecutor implements Runnable { if (operations == null) { // TODO: externalize message? // NOTE: I assume in Erlang sender doesn't get confirmation so - // message will be send + // no message will be send logger.log(Level.WARNING, "Node '" + name + "' received message addressed to non exising mbox: " + msg.getRecipientName()); @@ -224,8 +235,8 @@ public class ServiceExecutor implements Runnable { } if (matchedOperation != null) { try { - Object result = erlangMbox.getService().getRuntimeWire( - erlangMbox.getBinding()).invoke(matchedOperation, + Object result = nodeElement.getService().getRuntimeWire( + nodeElement.getBinding()).invoke(matchedOperation, args); OtpErlangObject response = null; if (matchedOperation.getOutputType() != null @@ -237,50 +248,65 @@ public class ServiceExecutor implements Runnable { response = TypeHelpersProxy.toErlang(arrArg); } if (response != null) { - OtpNode node = new OtpNode("_response_connector_to_" - + msg.getSenderPid()); + OtpNode node = new OtpNode( + getResponseClientNodeName(msg.getSenderPid())); OtpMbox mbox = node.createMbox(); mbox.send(msg.getSenderPid(), response); } } catch (InvocationTargetException e) { // FIXME: use linking feature? send some error? e.printStackTrace(); - } catch (IOException e) { + // } catch (IOException e) { + } catch (Exception e) { // FIXME: log this problem? use linking feature? send error? e.printStackTrace(); } } else { - // TODO: externalize message? - // NOTE: don't send error message if mapping not found - logger.log(Level.WARNING, "No mapping for such arguments in '" - + msg.getRecipientName() + "' operation in '" + name - + "' node."); + try { + // TODO: externalize message? + // NOTE: don't send error message if mapping not found + logger.log(Level.WARNING, + "No mapping for such arguments in '" + + msg.getRecipientName() + + "' operation in '" + name + + "' node. Recevied arguments: " + + msg.getMsg()); + } catch (OtpErlangDecodeException e) { + } } } } public void run() { try { - // TODO: should receive timeout be configured in .composite file? - OtpMsg msg = connection.receiveMsg(RECEIVE_TIMEOUT); - if (msg.getRecipientName().equals(MessageHelper.RPC_MBOX)) { + // NOTE: there's also a timeout, like in reference bindings + OtpMsg msg = null; + if (nodeElement.getBinding().hasTimeout()) { + msg = connection.receiveMsg(nodeElement.getBinding() + .getTimeout()); + } else { + msg = connection.receiveMsg(); + } + if (msg.getRecipientName().equals(MessageHelper.RPC_MBOX) + && !nodeElement.getBinding().isMbox()) { handleRpc(msg); - } else if (msg != null) { + } else if (!msg.getRecipientName().equals(MessageHelper.RPC_MBOX) + && nodeElement.getBinding().isMbox()) { handleMsg(msg); } else { - // message receive timeout + // received wrong message type } } catch (IOException e) { // TODO: externalize message? logger.log(Level.WARNING, "Problem while receiving message", e); } catch (OtpErlangExit e) { // TODO: linking? - e.printStackTrace(); } catch (OtpAuthException e) { - // TODO: cookies? + // TODO: cookies? does this exception occur sometime? } catch (InterruptedException e) { - // TODO: when it could happen? - e.printStackTrace(); + // NOTE: timeout will be logged + // TODO: externalize message? + logger.log(Level.WARNING, "Timeout while waiting for request", e); } finally { connection.close(); } diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/types/TupleTypeHelper.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/types/TupleTypeHelper.java index 324cd736ab..474459b9dc 100644 --- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/types/TupleTypeHelper.java +++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/types/TupleTypeHelper.java @@ -23,8 +23,6 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; -import org.apache.tuscany.sca.binding.erlang.impl.exceptions.ErlangException; - import com.ericsson.otp.erlang.OtpErlangObject; import com.ericsson.otp.erlang.OtpErlangTuple; @@ -38,16 +36,16 @@ public class TupleTypeHelper implements TypeHelper { List<OtpErlangObject> tupleMembers = new ArrayList<OtpErlangObject>(); Field[] fields = forClass.getFields(); for (int i = 0; i < fields.length; i++) { - Object[] args; + Object[] args = null; try { args = new Object[] { fields[i].get(object) }; - OtpErlangObject member = TypeHelpersProxy.toErlang(args); - tupleMembers.add(member); - } catch (Exception e) { - // TODO: declaring toErlang method with Exception and throwing - // this? - e.printStackTrace(); + } catch (IllegalArgumentException e) { + // no problem should occur here + } catch (IllegalAccessException e) { + // and here } + OtpErlangObject member = TypeHelpersProxy.toErlang(args); + tupleMembers.add(member); } OtpErlangObject result = new OtpErlangTuple(tupleMembers .toArray(new OtpErlangObject[tupleMembers.size()])); @@ -59,14 +57,6 @@ public class TupleTypeHelper implements TypeHelper { Object result = null; OtpErlangTuple tuple = (OtpErlangTuple) object; Field[] fields = forClass.getFields(); - if (fields.length != tuple.arity()) { - throw new ErlangException( - "Received tuple with different element count (" - + tuple.arity() + ") than expected (" - + fields.length + ")"); - // FIXME: JUnit this - received tuple with different element count - - // wrong message, exception! - } result = forClass.newInstance(); for (int i = 0; i < tuple.arity(); i++) { OtpErlangObject tupleMember = tuple.elementAt(i); |