From fce6ad4c30b15b4284a94e7a00e084bf18748cf2 Mon Sep 17 00:00:00 2001 From: wjaniszewski Date: Fri, 20 Mar 2009 14:23:51 +0000 Subject: Added configurable timeout and cookies for service bindings. Consequence is that one SCA-Erlang node is created excusively for one service binding, so service bindings cannot share the same value in 'node' attribute like it was before. Made some other fixes. git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@756480 13f79535-47bb-0310-9956-ffa450edef68 --- .../erlang/impl/ErlangBindingProviderFactory.java | 33 ++++---- .../sca/binding/erlang/impl/ErlangInvoker.java | 27 ++++--- .../sca/binding/erlang/impl/ErlangNode.java | 87 ++++++-------------- .../erlang/impl/ErlangServiceBindingProvider.java | 13 ++- .../sca/binding/erlang/impl/ServiceExecutor.java | 92 ++++++++++++++-------- .../binding/erlang/impl/types/TupleTypeHelper.java | 24 ++---- 6 files changed, 130 insertions(+), 146 deletions(-) (limited to 'sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany') diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangBindingProviderFactory.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangBindingProviderFactory.java index d01d266332..f7ea057e56 100644 --- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangBindingProviderFactory.java +++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangBindingProviderFactory.java @@ -19,8 +19,8 @@ package org.apache.tuscany.sca.binding.erlang.impl; -import java.util.HashMap; -import java.util.Map; +import java.util.HashSet; +import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -39,9 +39,10 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService; public class ErlangBindingProviderFactory implements BindingProviderFactory { - private Map nodes = new HashMap(); private static final Logger logger = Logger .getLogger(ErlangBindingProviderFactory.class.getName()); + + private Set nodes = new HashSet(); public ErlangBindingProviderFactory(ExtensionPointRegistry registry) { @@ -68,10 +69,18 @@ public class ErlangBindingProviderFactory implements ErlangBinding binding) { ServiceBindingProvider provider = null; try { - provider = new ErlangServiceBindingProvider(getErlangNode(binding - .getNode()), binding, service); + if (nodes.contains(binding.getNode())) { + // TODO: externalize message? + logger.log(Level.WARNING, + "Node name '" + binding.getNode() + "' already registered. This service will not be spawned."); + } else { + provider = new ErlangServiceBindingProvider(binding, service); + nodes.add(binding.getNode()); + } } catch (Exception e) { - logger.log(Level.WARNING, "Exception during creating ServiceBindingProvider", e); + // TODO: externalize message? + logger.log(Level.WARNING, + "Exception during creating ServiceBindingProvider", e); } return provider; } @@ -82,16 +91,4 @@ public class ErlangBindingProviderFactory implements public Class getModelType() { return ErlangBinding.class; } - - private ErlangNode getErlangNode(String name) throws Exception { - ErlangNode result = null; - if (nodes.containsKey(name)) { - result = nodes.get(name); - } else { - result = new ErlangNode(name); - nodes.put(name, result); - } - return result; - } - } diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java index 9a540fd5bb..7cd23aa1b1 100644 --- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java +++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java @@ -66,10 +66,6 @@ public class ErlangInvoker implements Invoker { } } - private boolean isCookieProvided() throws Exception { - return binding.getCookie() != null && binding.getCookie().length() > 0; - } - private String getClientNodeName() { return "_connector_to_" + binding.getNode() + System.currentTimeMillis(); @@ -80,7 +76,7 @@ public class ErlangInvoker implements Invoker { OtpNode node = null; try { node = new OtpNode(getClientNodeName()); - if (isCookieProvided()) { + if (binding.hasCookie()) { node.setCookie(binding.getCookie()); } tmpMbox = node.createMbox(); @@ -89,7 +85,12 @@ public class ErlangInvoker implements Invoker { tmpMbox.send(msg.getOperation().getName(), binding.getNode(), msgPayload); if (msg.getOperation().getOutputType() != null) { - OtpMsg resultMsg = tmpMbox.receiveMsg(binding.getTimeout()); + OtpMsg resultMsg = null; + if (binding.hasTimeout()) { + resultMsg = tmpMbox.receiveMsg(binding.getTimeout()); + } else { + resultMsg = tmpMbox.receiveMsg(); + } OtpErlangObject result = resultMsg.getMsg(); msg.setBody(TypeHelpersProxy.toJava(result, msg.getOperation() .getOutputType().getPhysical())); @@ -120,7 +121,7 @@ public class ErlangInvoker implements Invoker { OtpConnection connection = null; try { self = new OtpSelf(getClientNodeName()); - if (isCookieProvided()) { + if (binding.hasCookie()) { self.setCookie(binding.getCookie()); } other = new OtpPeer(binding.getNode()); @@ -131,8 +132,12 @@ public class ErlangInvoker implements Invoker { .createRef(), binding.getModule(), msg.getOperation() .getName(), params); connection.send(MessageHelper.RPC_MBOX, message); - OtpErlangObject rpcResponse = connection.receive(binding - .getTimeout()); + OtpErlangObject rpcResponse = null; + if (binding.hasTimeout()) { + rpcResponse = connection.receive(binding.getTimeout()); + } else { + rpcResponse = connection.receive(); + } OtpErlangObject result = ((OtpErlangTuple) rpcResponse) .elementAt(1); if (MessageHelper.isfunctionUndefMessage(result)) { @@ -153,7 +158,9 @@ public class ErlangInvoker implements Invoker { } } catch (OtpAuthException e) { // TODO: externalize message? - ErlangException ee = new ErlangException("Problem while authenticating client - check your cookie", e); + ErlangException ee = new ErlangException( + "Problem while authenticating client - check your cookie", + e); msg.setBody(null); reportProblem(msg, ee); } catch (InterruptedException e) { diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangNode.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangNode.java index beeec9fffb..94bac31809 100644 --- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangNode.java +++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangNode.java @@ -46,16 +46,15 @@ public class ErlangNode implements Runnable { private static final Logger logger = Logger.getLogger(ErlangNode.class .getName()); - private Map erlangModules = new HashMap(); - private ErlangNodeElement erlangMbox; - private boolean mboxNode; + private ErlangNodeElement nodeElement; private String name; private OtpSelf self; private ExecutorService executors; private boolean stopRequested; private Map> groupedOperations; - public ErlangNode(String name) throws Exception { + public ErlangNode(String name, ErlangBinding binding, + RuntimeComponentService service) throws Exception { this.name = name; self = new OtpSelf(name); boolean registered = self.publishPort(); @@ -64,9 +63,13 @@ public class ErlangNode implements Runnable { throw new ErlangException( "Problem with publishing service under epmd server."); } + if (binding.hasCookie()) { + self.setCookie(binding.getCookie()); + } + registerBinding(binding, service); } - private void stop() { + public void stop() { stopRequested = true; executors.shutdownNow(); } @@ -78,76 +81,38 @@ public class ErlangNode implements Runnable { try { OtpConnection connection = self.accept(); executors.execute(new ServiceExecutor(connection, - groupedOperations, erlangModules, erlangMbox, name)); + groupedOperations, nodeElement, name)); } catch (IOException e) { // TODO: externalzie message? logger.log(Level.WARNING, "Error occured while accepting connection on '" + name - + "' node"); + + "' node", e); } catch (OtpAuthException e) { - // TODO: log bad authentication attempt + // TODO: externalize message? + logger.log(Level.WARNING, "Error while authenticating client", e); } } } - public void registerBinding(ErlangBinding binding, + private void registerBinding(ErlangBinding binding, RuntimeComponentService service) throws ErlangException { if (binding.isMbox()) { - if (mboxNode) { - // TODO: externalize message? - // NOTE: if mbox registered more than once for node then - // exception will be thrown - throw new ErlangException("Node " + binding.getNode() - + " already defined as mbox node"); - } else { - List operations = service.getInterfaceContract() - .getInterface().getOperations(); - groupedOperations = new HashMap>(); - for (Operation operation : operations) { - List operationsGroup = groupedOperations - .get(operation.getName()); - if (operationsGroup == null) { - operationsGroup = new ArrayList(); - groupedOperations.put(operation.getName(), - operationsGroup); - } - operationsGroup.add(operation); - } - mboxNode = true; - erlangMbox = new ErlangNodeElement(); - erlangMbox.setService(service); - erlangMbox.setBinding(binding); - } - } else { - if (erlangModules.containsKey(binding.getModule())) { - // TODO: externalize message? - // NOTE: if the same module was registered more than once than - // exception will be thrown - throw new ErlangException("Module " + binding.getModule() - + " already defined under " + name - + " node. Duplicate module won't be started"); - } else { - if (erlangModules.size() == 0) { - // NOTE: Erlang node is managing it's thread by itself. Just noticing. - Thread selfThread = new Thread(this); - selfThread.start(); + List operations = service.getInterfaceContract() + .getInterface().getOperations(); + groupedOperations = new HashMap>(); + for (Operation operation : operations) { + List operationsGroup = groupedOperations + .get(operation.getName()); + if (operationsGroup == null) { + operationsGroup = new ArrayList(); + groupedOperations.put(operation.getName(), operationsGroup); } - ErlangNodeElement module = new ErlangNodeElement(); - module.setService(service); - module.setBinding(binding); - erlangModules.put(binding.getModule(), module); - } - } - } - - public void unregisterBinding(ErlangBinding binding) throws ErlangException { - if (erlangModules.containsKey(binding.getModule())) { - erlangModules.remove(binding.getModule()); - erlangModules.remove(binding.getModule()); - if (erlangModules.size() == 0) { - stop(); + operationsGroup.add(operation); } } + nodeElement = new ErlangNodeElement(); + nodeElement.setService(service); + nodeElement.setBinding(binding); } } diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangServiceBindingProvider.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangServiceBindingProvider.java index 38b9b197d6..c2546a4336 100644 --- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangServiceBindingProvider.java +++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangServiceBindingProvider.java @@ -32,13 +32,11 @@ public class ErlangServiceBindingProvider implements ServiceBindingProvider { private RuntimeComponentService service; private ErlangNode node; - private ErlangBinding binding; - public ErlangServiceBindingProvider(ErlangNode node, ErlangBinding binding, - RuntimeComponentService service) { + public ErlangServiceBindingProvider(ErlangBinding binding, + RuntimeComponentService service) throws Exception { this.service = service; - this.binding = binding; - this.node = node; + this.node = new ErlangNode(binding.getNode(), binding, service); } /** @@ -53,7 +51,8 @@ public class ErlangServiceBindingProvider implements ServiceBindingProvider { */ public void start() { try { - node.registerBinding(binding, service); + Thread thread = new Thread(node); + thread.start(); } catch (Exception e) { throw new ServiceRuntimeException(e); } @@ -65,7 +64,7 @@ public class ErlangServiceBindingProvider implements ServiceBindingProvider { */ public void stop() { try { - node.unregisterBinding(binding); + node.stop(); } catch (Exception e) { throw new ServiceRuntimeException(e); } diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java index a03d522860..001256f5f0 100644 --- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java +++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java @@ -35,6 +35,7 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService; import com.ericsson.otp.erlang.OtpAuthException; import com.ericsson.otp.erlang.OtpConnection; import com.ericsson.otp.erlang.OtpErlangAtom; +import com.ericsson.otp.erlang.OtpErlangDecodeException; import com.ericsson.otp.erlang.OtpErlangExit; import com.ericsson.otp.erlang.OtpErlangList; import com.ericsson.otp.erlang.OtpErlangObject; @@ -53,22 +54,18 @@ public class ServiceExecutor implements Runnable { private static final Logger logger = Logger.getLogger(ServiceExecutor.class .getName()); - private static final long RECEIVE_TIMEOUT = 60000; - private Map erlangModules; - private ErlangNodeElement erlangMbox; + private ErlangNodeElement nodeElement; private OtpConnection connection; private Map> groupedOperations; private String name; public ServiceExecutor(OtpConnection connection, Map> groupedOperations, - Map erlangModules, - ErlangNodeElement erlangMbox, String name) { - this.erlangModules = erlangModules; + ErlangNodeElement nodeElement, String name) { this.connection = connection; this.groupedOperations = groupedOperations; - this.erlangMbox = erlangMbox; + this.nodeElement = nodeElement; this.name = name; } @@ -82,6 +79,10 @@ public class ServiceExecutor implements Runnable { connection.send(pid, msg); } + private String getResponseClientNodeName(OtpErlangPid pid) { + return "_response_connector_to_" + pid + System.currentTimeMillis(); + } + private void handleRpc(OtpMsg msg) { OtpErlangTuple request = null; OtpErlangPid senderPid = null; @@ -103,7 +104,7 @@ public class ServiceExecutor implements Runnable { } else { argsList = new OtpErlangList(args); } - if (!erlangModules.containsKey(module)) { + if (!nodeElement.getBinding().getModule().equals(module)) { // TODO: externalize message? OtpErlangObject errorMsg = MessageHelper.functionUndefMessage( module, function, argsList, @@ -111,9 +112,8 @@ public class ServiceExecutor implements Runnable { sendMessage(connection, senderPid, senderRef, MessageHelper.ATOM_BADRPC, errorMsg); } else { - RuntimeComponentService service = erlangModules.get(module) - .getService(); - ErlangBinding binding = erlangModules.get(module).getBinding(); + RuntimeComponentService service = nodeElement.getService(); + ErlangBinding binding = nodeElement.getBinding(); List operations = service.getInterfaceContract() .getInterface().getOperations(); Operation operation = null; @@ -177,6 +177,18 @@ public class ServiceExecutor implements Runnable { MessageHelper.ATOM_BADRPC, errorMsg); } } + } catch (ClassCastException e) { + // TODO: externalize message? + try { + logger + .log( + Level.WARNING, + "On node '" + + nodeElement.getBinding().getNode() + + "' received RPC request which is invalid. Request content is: " + + msg.getMsg()); + } catch (OtpErlangDecodeException e1) { + } } catch (Exception e) { try { sendMessage(connection, senderPid, senderRef, @@ -184,11 +196,10 @@ public class ServiceExecutor implements Runnable { "Unhandled error while processing request: " + e.getClass().getCanonicalName() + ", message: " + e.getMessage())); - } catch (IOException e1) { + } catch (Exception e1) { // error while sending error message. Can't do anything now logger.log(Level.WARNING, "Error during sending error message", e); - e.printStackTrace(); } } } @@ -201,7 +212,7 @@ public class ServiceExecutor implements Runnable { if (operations == null) { // TODO: externalize message? // NOTE: I assume in Erlang sender doesn't get confirmation so - // message will be send + // no message will be send logger.log(Level.WARNING, "Node '" + name + "' received message addressed to non exising mbox: " + msg.getRecipientName()); @@ -224,8 +235,8 @@ public class ServiceExecutor implements Runnable { } if (matchedOperation != null) { try { - Object result = erlangMbox.getService().getRuntimeWire( - erlangMbox.getBinding()).invoke(matchedOperation, + Object result = nodeElement.getService().getRuntimeWire( + nodeElement.getBinding()).invoke(matchedOperation, args); OtpErlangObject response = null; if (matchedOperation.getOutputType() != null @@ -237,50 +248,65 @@ public class ServiceExecutor implements Runnable { response = TypeHelpersProxy.toErlang(arrArg); } if (response != null) { - OtpNode node = new OtpNode("_response_connector_to_" - + msg.getSenderPid()); + OtpNode node = new OtpNode( + getResponseClientNodeName(msg.getSenderPid())); OtpMbox mbox = node.createMbox(); mbox.send(msg.getSenderPid(), response); } } catch (InvocationTargetException e) { // FIXME: use linking feature? send some error? e.printStackTrace(); - } catch (IOException e) { + // } catch (IOException e) { + } catch (Exception e) { // FIXME: log this problem? use linking feature? send error? e.printStackTrace(); } } else { - // TODO: externalize message? - // NOTE: don't send error message if mapping not found - logger.log(Level.WARNING, "No mapping for such arguments in '" - + msg.getRecipientName() + "' operation in '" + name - + "' node."); + try { + // TODO: externalize message? + // NOTE: don't send error message if mapping not found + logger.log(Level.WARNING, + "No mapping for such arguments in '" + + msg.getRecipientName() + + "' operation in '" + name + + "' node. Recevied arguments: " + + msg.getMsg()); + } catch (OtpErlangDecodeException e) { + } } } } public void run() { try { - // TODO: should receive timeout be configured in .composite file? - OtpMsg msg = connection.receiveMsg(RECEIVE_TIMEOUT); - if (msg.getRecipientName().equals(MessageHelper.RPC_MBOX)) { + // NOTE: there's also a timeout, like in reference bindings + OtpMsg msg = null; + if (nodeElement.getBinding().hasTimeout()) { + msg = connection.receiveMsg(nodeElement.getBinding() + .getTimeout()); + } else { + msg = connection.receiveMsg(); + } + if (msg.getRecipientName().equals(MessageHelper.RPC_MBOX) + && !nodeElement.getBinding().isMbox()) { handleRpc(msg); - } else if (msg != null) { + } else if (!msg.getRecipientName().equals(MessageHelper.RPC_MBOX) + && nodeElement.getBinding().isMbox()) { handleMsg(msg); } else { - // message receive timeout + // received wrong message type } } catch (IOException e) { // TODO: externalize message? logger.log(Level.WARNING, "Problem while receiving message", e); } catch (OtpErlangExit e) { // TODO: linking? - e.printStackTrace(); } catch (OtpAuthException e) { - // TODO: cookies? + // TODO: cookies? does this exception occur sometime? } catch (InterruptedException e) { - // TODO: when it could happen? - e.printStackTrace(); + // NOTE: timeout will be logged + // TODO: externalize message? + logger.log(Level.WARNING, "Timeout while waiting for request", e); } finally { connection.close(); } diff --git a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/types/TupleTypeHelper.java b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/types/TupleTypeHelper.java index 324cd736ab..474459b9dc 100644 --- a/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/types/TupleTypeHelper.java +++ b/sandbox/wjaniszewski/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/types/TupleTypeHelper.java @@ -23,8 +23,6 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; -import org.apache.tuscany.sca.binding.erlang.impl.exceptions.ErlangException; - import com.ericsson.otp.erlang.OtpErlangObject; import com.ericsson.otp.erlang.OtpErlangTuple; @@ -38,16 +36,16 @@ public class TupleTypeHelper implements TypeHelper { List tupleMembers = new ArrayList(); Field[] fields = forClass.getFields(); for (int i = 0; i < fields.length; i++) { - Object[] args; + Object[] args = null; try { args = new Object[] { fields[i].get(object) }; - OtpErlangObject member = TypeHelpersProxy.toErlang(args); - tupleMembers.add(member); - } catch (Exception e) { - // TODO: declaring toErlang method with Exception and throwing - // this? - e.printStackTrace(); + } catch (IllegalArgumentException e) { + // no problem should occur here + } catch (IllegalAccessException e) { + // and here } + OtpErlangObject member = TypeHelpersProxy.toErlang(args); + tupleMembers.add(member); } OtpErlangObject result = new OtpErlangTuple(tupleMembers .toArray(new OtpErlangObject[tupleMembers.size()])); @@ -59,14 +57,6 @@ public class TupleTypeHelper implements TypeHelper { Object result = null; OtpErlangTuple tuple = (OtpErlangTuple) object; Field[] fields = forClass.getFields(); - if (fields.length != tuple.arity()) { - throw new ErlangException( - "Received tuple with different element count (" - + tuple.arity() + ") than expected (" - + fields.length + ")"); - // FIXME: JUnit this - received tuple with different element count - - // wrong message, exception! - } result = forClass.newInstance(); for (int i = 0; i < tuple.arity(); i++) { OtpErlangObject tupleMember = tuple.elementAt(i); -- cgit v1.2.3