Added configurable timeout and cookies for service bindings. Consequence is that one SCA-Erlang node is created excusively for one service binding, so service bindings cannot share the same value in 'node' attribute like it was before. Made some other fixes.

git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@756480 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
wjaniszewski 2009-03-20 14:23:51 +00:00
parent e962b3bbdf
commit fce6ad4c30
14 changed files with 229 additions and 272 deletions

View file

@ -19,8 +19,8 @@
package org.apache.tuscany.sca.binding.erlang.impl;
import java.util.HashMap;
import java.util.Map;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -39,9 +39,10 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService;
public class ErlangBindingProviderFactory implements
BindingProviderFactory<ErlangBinding> {
private Map<String, ErlangNode> nodes = new HashMap<String, ErlangNode>();
private static final Logger logger = Logger
.getLogger(ErlangBindingProviderFactory.class.getName());
private Set<String> nodes = new HashSet<String>();
public ErlangBindingProviderFactory(ExtensionPointRegistry registry) {
@ -68,10 +69,18 @@ public class ErlangBindingProviderFactory implements
ErlangBinding binding) {
ServiceBindingProvider provider = null;
try {
provider = new ErlangServiceBindingProvider(getErlangNode(binding
.getNode()), binding, service);
if (nodes.contains(binding.getNode())) {
// TODO: externalize message?
logger.log(Level.WARNING,
"Node name '" + binding.getNode() + "' already registered. This service will not be spawned.");
} else {
provider = new ErlangServiceBindingProvider(binding, service);
nodes.add(binding.getNode());
}
} catch (Exception e) {
logger.log(Level.WARNING, "Exception during creating ServiceBindingProvider", e);
// TODO: externalize message?
logger.log(Level.WARNING,
"Exception during creating ServiceBindingProvider", e);
}
return provider;
}
@ -82,16 +91,4 @@ public class ErlangBindingProviderFactory implements
public Class<ErlangBinding> getModelType() {
return ErlangBinding.class;
}
private ErlangNode getErlangNode(String name) throws Exception {
ErlangNode result = null;
if (nodes.containsKey(name)) {
result = nodes.get(name);
} else {
result = new ErlangNode(name);
nodes.put(name, result);
}
return result;
}
}

View file

@ -66,10 +66,6 @@ public class ErlangInvoker implements Invoker {
}
}
private boolean isCookieProvided() throws Exception {
return binding.getCookie() != null && binding.getCookie().length() > 0;
}
private String getClientNodeName() {
return "_connector_to_" + binding.getNode()
+ System.currentTimeMillis();
@ -80,7 +76,7 @@ public class ErlangInvoker implements Invoker {
OtpNode node = null;
try {
node = new OtpNode(getClientNodeName());
if (isCookieProvided()) {
if (binding.hasCookie()) {
node.setCookie(binding.getCookie());
}
tmpMbox = node.createMbox();
@ -89,7 +85,12 @@ public class ErlangInvoker implements Invoker {
tmpMbox.send(msg.getOperation().getName(), binding.getNode(),
msgPayload);
if (msg.getOperation().getOutputType() != null) {
OtpMsg resultMsg = tmpMbox.receiveMsg(binding.getTimeout());
OtpMsg resultMsg = null;
if (binding.hasTimeout()) {
resultMsg = tmpMbox.receiveMsg(binding.getTimeout());
} else {
resultMsg = tmpMbox.receiveMsg();
}
OtpErlangObject result = resultMsg.getMsg();
msg.setBody(TypeHelpersProxy.toJava(result, msg.getOperation()
.getOutputType().getPhysical()));
@ -120,7 +121,7 @@ public class ErlangInvoker implements Invoker {
OtpConnection connection = null;
try {
self = new OtpSelf(getClientNodeName());
if (isCookieProvided()) {
if (binding.hasCookie()) {
self.setCookie(binding.getCookie());
}
other = new OtpPeer(binding.getNode());
@ -131,8 +132,12 @@ public class ErlangInvoker implements Invoker {
.createRef(), binding.getModule(), msg.getOperation()
.getName(), params);
connection.send(MessageHelper.RPC_MBOX, message);
OtpErlangObject rpcResponse = connection.receive(binding
.getTimeout());
OtpErlangObject rpcResponse = null;
if (binding.hasTimeout()) {
rpcResponse = connection.receive(binding.getTimeout());
} else {
rpcResponse = connection.receive();
}
OtpErlangObject result = ((OtpErlangTuple) rpcResponse)
.elementAt(1);
if (MessageHelper.isfunctionUndefMessage(result)) {
@ -153,7 +158,9 @@ public class ErlangInvoker implements Invoker {
}
} catch (OtpAuthException e) {
// TODO: externalize message?
ErlangException ee = new ErlangException("Problem while authenticating client - check your cookie", e);
ErlangException ee = new ErlangException(
"Problem while authenticating client - check your cookie",
e);
msg.setBody(null);
reportProblem(msg, ee);
} catch (InterruptedException e) {

View file

@ -46,16 +46,15 @@ public class ErlangNode implements Runnable {
private static final Logger logger = Logger.getLogger(ErlangNode.class
.getName());
private Map<String, ErlangNodeElement> erlangModules = new HashMap<String, ErlangNodeElement>();
private ErlangNodeElement erlangMbox;
private boolean mboxNode;
private ErlangNodeElement nodeElement;
private String name;
private OtpSelf self;
private ExecutorService executors;
private boolean stopRequested;
private Map<String, List<Operation>> groupedOperations;
public ErlangNode(String name) throws Exception {
public ErlangNode(String name, ErlangBinding binding,
RuntimeComponentService service) throws Exception {
this.name = name;
self = new OtpSelf(name);
boolean registered = self.publishPort();
@ -64,9 +63,13 @@ public class ErlangNode implements Runnable {
throw new ErlangException(
"Problem with publishing service under epmd server.");
}
if (binding.hasCookie()) {
self.setCookie(binding.getCookie());
}
registerBinding(binding, service);
}
private void stop() {
public void stop() {
stopRequested = true;
executors.shutdownNow();
}
@ -78,76 +81,38 @@ public class ErlangNode implements Runnable {
try {
OtpConnection connection = self.accept();
executors.execute(new ServiceExecutor(connection,
groupedOperations, erlangModules, erlangMbox, name));
groupedOperations, nodeElement, name));
} catch (IOException e) {
// TODO: externalzie message?
logger.log(Level.WARNING,
"Error occured while accepting connection on '" + name
+ "' node");
+ "' node", e);
} catch (OtpAuthException e) {
// TODO: log bad authentication attempt
// TODO: externalize message?
logger.log(Level.WARNING, "Error while authenticating client", e);
}
}
}
public void registerBinding(ErlangBinding binding,
private void registerBinding(ErlangBinding binding,
RuntimeComponentService service) throws ErlangException {
if (binding.isMbox()) {
if (mboxNode) {
// TODO: externalize message?
// NOTE: if mbox registered more than once for node then
// exception will be thrown
throw new ErlangException("Node " + binding.getNode()
+ " already defined as mbox node");
} else {
List<Operation> operations = service.getInterfaceContract()
.getInterface().getOperations();
groupedOperations = new HashMap<String, List<Operation>>();
for (Operation operation : operations) {
List<Operation> operationsGroup = groupedOperations
.get(operation.getName());
if (operationsGroup == null) {
operationsGroup = new ArrayList<Operation>();
groupedOperations.put(operation.getName(),
operationsGroup);
}
operationsGroup.add(operation);
List<Operation> operations = service.getInterfaceContract()
.getInterface().getOperations();
groupedOperations = new HashMap<String, List<Operation>>();
for (Operation operation : operations) {
List<Operation> operationsGroup = groupedOperations
.get(operation.getName());
if (operationsGroup == null) {
operationsGroup = new ArrayList<Operation>();
groupedOperations.put(operation.getName(), operationsGroup);
}
mboxNode = true;
erlangMbox = new ErlangNodeElement();
erlangMbox.setService(service);
erlangMbox.setBinding(binding);
}
} else {
if (erlangModules.containsKey(binding.getModule())) {
// TODO: externalize message?
// NOTE: if the same module was registered more than once than
// exception will be thrown
throw new ErlangException("Module " + binding.getModule()
+ " already defined under " + name
+ " node. Duplicate module won't be started");
} else {
if (erlangModules.size() == 0) {
// NOTE: Erlang node is managing it's thread by itself. Just noticing.
Thread selfThread = new Thread(this);
selfThread.start();
}
ErlangNodeElement module = new ErlangNodeElement();
module.setService(service);
module.setBinding(binding);
erlangModules.put(binding.getModule(), module);
}
}
}
public void unregisterBinding(ErlangBinding binding) throws ErlangException {
if (erlangModules.containsKey(binding.getModule())) {
erlangModules.remove(binding.getModule());
erlangModules.remove(binding.getModule());
if (erlangModules.size() == 0) {
stop();
operationsGroup.add(operation);
}
}
nodeElement = new ErlangNodeElement();
nodeElement.setService(service);
nodeElement.setBinding(binding);
}
}

View file

@ -32,13 +32,11 @@ public class ErlangServiceBindingProvider implements ServiceBindingProvider {
private RuntimeComponentService service;
private ErlangNode node;
private ErlangBinding binding;
public ErlangServiceBindingProvider(ErlangNode node, ErlangBinding binding,
RuntimeComponentService service) {
public ErlangServiceBindingProvider(ErlangBinding binding,
RuntimeComponentService service) throws Exception {
this.service = service;
this.binding = binding;
this.node = node;
this.node = new ErlangNode(binding.getNode(), binding, service);
}
/**
@ -53,7 +51,8 @@ public class ErlangServiceBindingProvider implements ServiceBindingProvider {
*/
public void start() {
try {
node.registerBinding(binding, service);
Thread thread = new Thread(node);
thread.start();
} catch (Exception e) {
throw new ServiceRuntimeException(e);
}
@ -65,7 +64,7 @@ public class ErlangServiceBindingProvider implements ServiceBindingProvider {
*/
public void stop() {
try {
node.unregisterBinding(binding);
node.stop();
} catch (Exception e) {
throw new ServiceRuntimeException(e);
}

View file

@ -35,6 +35,7 @@ import org.apache.tuscany.sca.runtime.RuntimeComponentService;
import com.ericsson.otp.erlang.OtpAuthException;
import com.ericsson.otp.erlang.OtpConnection;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangDecodeException;
import com.ericsson.otp.erlang.OtpErlangExit;
import com.ericsson.otp.erlang.OtpErlangList;
import com.ericsson.otp.erlang.OtpErlangObject;
@ -53,22 +54,18 @@ public class ServiceExecutor implements Runnable {
private static final Logger logger = Logger.getLogger(ServiceExecutor.class
.getName());
private static final long RECEIVE_TIMEOUT = 60000;
private Map<String, ErlangNodeElement> erlangModules;
private ErlangNodeElement erlangMbox;
private ErlangNodeElement nodeElement;
private OtpConnection connection;
private Map<String, List<Operation>> groupedOperations;
private String name;
public ServiceExecutor(OtpConnection connection,
Map<String, List<Operation>> groupedOperations,
Map<String, ErlangNodeElement> erlangModules,
ErlangNodeElement erlangMbox, String name) {
this.erlangModules = erlangModules;
ErlangNodeElement nodeElement, String name) {
this.connection = connection;
this.groupedOperations = groupedOperations;
this.erlangMbox = erlangMbox;
this.nodeElement = nodeElement;
this.name = name;
}
@ -82,6 +79,10 @@ public class ServiceExecutor implements Runnable {
connection.send(pid, msg);
}
private String getResponseClientNodeName(OtpErlangPid pid) {
return "_response_connector_to_" + pid + System.currentTimeMillis();
}
private void handleRpc(OtpMsg msg) {
OtpErlangTuple request = null;
OtpErlangPid senderPid = null;
@ -103,7 +104,7 @@ public class ServiceExecutor implements Runnable {
} else {
argsList = new OtpErlangList(args);
}
if (!erlangModules.containsKey(module)) {
if (!nodeElement.getBinding().getModule().equals(module)) {
// TODO: externalize message?
OtpErlangObject errorMsg = MessageHelper.functionUndefMessage(
module, function, argsList,
@ -111,9 +112,8 @@ public class ServiceExecutor implements Runnable {
sendMessage(connection, senderPid, senderRef,
MessageHelper.ATOM_BADRPC, errorMsg);
} else {
RuntimeComponentService service = erlangModules.get(module)
.getService();
ErlangBinding binding = erlangModules.get(module).getBinding();
RuntimeComponentService service = nodeElement.getService();
ErlangBinding binding = nodeElement.getBinding();
List<Operation> operations = service.getInterfaceContract()
.getInterface().getOperations();
Operation operation = null;
@ -177,6 +177,18 @@ public class ServiceExecutor implements Runnable {
MessageHelper.ATOM_BADRPC, errorMsg);
}
}
} catch (ClassCastException e) {
// TODO: externalize message?
try {
logger
.log(
Level.WARNING,
"On node '"
+ nodeElement.getBinding().getNode()
+ "' received RPC request which is invalid. Request content is: "
+ msg.getMsg());
} catch (OtpErlangDecodeException e1) {
}
} catch (Exception e) {
try {
sendMessage(connection, senderPid, senderRef,
@ -184,11 +196,10 @@ public class ServiceExecutor implements Runnable {
"Unhandled error while processing request: "
+ e.getClass().getCanonicalName()
+ ", message: " + e.getMessage()));
} catch (IOException e1) {
} catch (Exception e1) {
// error while sending error message. Can't do anything now
logger.log(Level.WARNING, "Error during sending error message",
e);
e.printStackTrace();
}
}
}
@ -201,7 +212,7 @@ public class ServiceExecutor implements Runnable {
if (operations == null) {
// TODO: externalize message?
// NOTE: I assume in Erlang sender doesn't get confirmation so
// message will be send
// no message will be send
logger.log(Level.WARNING, "Node '" + name
+ "' received message addressed to non exising mbox: "
+ msg.getRecipientName());
@ -224,8 +235,8 @@ public class ServiceExecutor implements Runnable {
}
if (matchedOperation != null) {
try {
Object result = erlangMbox.getService().getRuntimeWire(
erlangMbox.getBinding()).invoke(matchedOperation,
Object result = nodeElement.getService().getRuntimeWire(
nodeElement.getBinding()).invoke(matchedOperation,
args);
OtpErlangObject response = null;
if (matchedOperation.getOutputType() != null
@ -237,50 +248,65 @@ public class ServiceExecutor implements Runnable {
response = TypeHelpersProxy.toErlang(arrArg);
}
if (response != null) {
OtpNode node = new OtpNode("_response_connector_to_"
+ msg.getSenderPid());
OtpNode node = new OtpNode(
getResponseClientNodeName(msg.getSenderPid()));
OtpMbox mbox = node.createMbox();
mbox.send(msg.getSenderPid(), response);
}
} catch (InvocationTargetException e) {
// FIXME: use linking feature? send some error?
e.printStackTrace();
} catch (IOException e) {
// } catch (IOException e) {
} catch (Exception e) {
// FIXME: log this problem? use linking feature? send error?
e.printStackTrace();
}
} else {
// TODO: externalize message?
// NOTE: don't send error message if mapping not found
logger.log(Level.WARNING, "No mapping for such arguments in '"
+ msg.getRecipientName() + "' operation in '" + name
+ "' node.");
try {
// TODO: externalize message?
// NOTE: don't send error message if mapping not found
logger.log(Level.WARNING,
"No mapping for such arguments in '"
+ msg.getRecipientName()
+ "' operation in '" + name
+ "' node. Recevied arguments: "
+ msg.getMsg());
} catch (OtpErlangDecodeException e) {
}
}
}
}
public void run() {
try {
// TODO: should receive timeout be configured in .composite file?
OtpMsg msg = connection.receiveMsg(RECEIVE_TIMEOUT);
if (msg.getRecipientName().equals(MessageHelper.RPC_MBOX)) {
// NOTE: there's also a timeout, like in reference bindings
OtpMsg msg = null;
if (nodeElement.getBinding().hasTimeout()) {
msg = connection.receiveMsg(nodeElement.getBinding()
.getTimeout());
} else {
msg = connection.receiveMsg();
}
if (msg.getRecipientName().equals(MessageHelper.RPC_MBOX)
&& !nodeElement.getBinding().isMbox()) {
handleRpc(msg);
} else if (msg != null) {
} else if (!msg.getRecipientName().equals(MessageHelper.RPC_MBOX)
&& nodeElement.getBinding().isMbox()) {
handleMsg(msg);
} else {
// message receive timeout
// received wrong message type
}
} catch (IOException e) {
// TODO: externalize message?
logger.log(Level.WARNING, "Problem while receiving message", e);
} catch (OtpErlangExit e) {
// TODO: linking?
e.printStackTrace();
} catch (OtpAuthException e) {
// TODO: cookies?
// TODO: cookies? does this exception occur sometime?
} catch (InterruptedException e) {
// TODO: when it could happen?
e.printStackTrace();
// NOTE: timeout will be logged
// TODO: externalize message?
logger.log(Level.WARNING, "Timeout while waiting for request", e);
} finally {
connection.close();
}

View file

@ -23,8 +23,6 @@ import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import org.apache.tuscany.sca.binding.erlang.impl.exceptions.ErlangException;
import com.ericsson.otp.erlang.OtpErlangObject;
import com.ericsson.otp.erlang.OtpErlangTuple;
@ -38,16 +36,16 @@ public class TupleTypeHelper implements TypeHelper {
List<OtpErlangObject> tupleMembers = new ArrayList<OtpErlangObject>();
Field[] fields = forClass.getFields();
for (int i = 0; i < fields.length; i++) {
Object[] args;
Object[] args = null;
try {
args = new Object[] { fields[i].get(object) };
OtpErlangObject member = TypeHelpersProxy.toErlang(args);
tupleMembers.add(member);
} catch (Exception e) {
// TODO: declaring toErlang method with Exception and throwing
// this?
e.printStackTrace();
} catch (IllegalArgumentException e) {
// no problem should occur here
} catch (IllegalAccessException e) {
// and here
}
OtpErlangObject member = TypeHelpersProxy.toErlang(args);
tupleMembers.add(member);
}
OtpErlangObject result = new OtpErlangTuple(tupleMembers
.toArray(new OtpErlangObject[tupleMembers.size()]));
@ -59,14 +57,6 @@ public class TupleTypeHelper implements TypeHelper {
Object result = null;
OtpErlangTuple tuple = (OtpErlangTuple) object;
Field[] fields = forClass.getFields();
if (fields.length != tuple.arity()) {
throw new ErlangException(
"Received tuple with different element count ("
+ tuple.arity() + ") than expected ("
+ fields.length + ")");
// FIXME: JUnit this - received tuple with different element count -
// wrong message, exception!
}
result = forClass.newInstance();
for (int i = 0; i < tuple.arity(); i++) {
OtpErlangObject tupleMember = tuple.elementAt(i);

View file

@ -34,9 +34,9 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.osoa.sca.ServiceRuntimeException;
import com.ericsson.otp.erlang.OtpAuthException;
import com.ericsson.otp.erlang.OtpConnection;
import com.ericsson.otp.erlang.OtpErlangAtom;
import com.ericsson.otp.erlang.OtpErlangBinary;
import com.ericsson.otp.erlang.OtpErlangBoolean;
@ -49,6 +49,8 @@ import com.ericsson.otp.erlang.OtpErlangString;
import com.ericsson.otp.erlang.OtpErlangTuple;
import com.ericsson.otp.erlang.OtpMbox;
import com.ericsson.otp.erlang.OtpNode;
import com.ericsson.otp.erlang.OtpPeer;
import com.ericsson.otp.erlang.OtpSelf;
/**
* Test is annotated with test runner, which will ignore tests if epmd is not
@ -63,15 +65,12 @@ public class ReferenceServiceTestCase {
private static MboxInterface mboxReference;
private static MboxInterface timeoutMboxReference;
private static MboxInterface cookieMboxReference;
private static ServiceInterface moduleReference;
private static ServiceInterface cookieModuleReference;
private static ServiceInterface invalidCookieModuleReference;
private static ServiceInterface timeoutModuleReference;
private static ServiceInterface clonedModuleReference;
private static OtpNode serNode;
private static OtpNode serCookieNode;
private static OtpMbox serMbox;
private static OtpMbox serCookieMbox;
private static OtpNode refNode;
private static OtpMbox refMbox;
private static Process epmdProcess;
@ -88,17 +87,14 @@ public class ReferenceServiceTestCase {
mboxReference = component.getMboxReference();
timeoutMboxReference = component.getTimeoutMboxReference();
cookieMboxReference = component.getCookieMboxReference();
moduleReference = component.getModuleReference();
cookieModuleReference = component.getCookieModuleReference();
invalidCookieModuleReference = component
.getInvalidCookieModuleReference();
timeoutModuleReference = component.getTimeoutModuleReference();
clonedModuleReference = component.getClonedModuleReference();
serNode = new OtpNode("MboxServer");
serCookieNode = new OtpNode("MboxServer");
serCookieNode.setCookie("cookie");
serMbox = serNode.createMbox("sendArgs");
serCookieMbox = serCookieNode.createMbox("sendArgs");
refNode = new OtpNode("MboxClient");
refMbox = refNode.createMbox("connector_to_SCA_mbox");
} catch (IOException e) {
@ -552,7 +548,7 @@ public class ReferenceServiceTestCase {
*
* @throws Exception
*/
@Test(timeout = 1000)
@Test(timeout = 10000000)
public void testRPC() throws Exception {
String[] result = moduleReference.sayHellos();
assertEquals(2, result.length);
@ -636,38 +632,7 @@ public class ReferenceServiceTestCase {
assertEquals(ErlangException.class, e.getClass());
}
}
/**
* Tests using multiple Erlang modules on one SCA Erlang node
*
* @throws Exception
*/
@Test(timeout = 1000)
public void testMultipleModulesOnNode() throws Exception {
String[] mr = moduleReference.sayHellos();
String[] cmr = clonedModuleReference.sayHellos();
assertEquals("1", mr[0]);
assertEquals("2", mr[1]);
assertEquals("-1", cmr[0]);
assertEquals("-2", cmr[1]);
}
/**
* Tests nodes with duplcated components (the same node and module
* parameters)
*
* @throws Exception
*/
@Test(timeout = 1000)
public void testModuleDuplicatedOnNode() throws Exception {
try {
SCADomain.newInstance("ErlangServiceModuleDuplicate.composite");
} catch (ServiceRuntimeException e) {
assertEquals(ErlangException.class, e.getCause().getClass());
}
}
/**
* Tests mbox with retrieving and answering with basic arguments
*
@ -679,7 +644,7 @@ public class ReferenceServiceTestCase {
args[0] = new OtpErlangString("world");
args[1] = new OtpErlangString("!");
OtpErlangTuple tuple = new OtpErlangTuple(args);
refMbox.send("sayHello", "RPCServer", tuple);
refMbox.send("sayHello", "RPCServerMbox", tuple);
OtpErlangString result = (OtpErlangString) refMbox.receiveMsg()
.getMsg();
assertEquals("Hello world !", result.stringValue());
@ -690,7 +655,7 @@ public class ReferenceServiceTestCase {
*
* @throws Exception
*/
@Test(timeout = 1000)
@Test(timeout = 2000)
public void testMboxWithComplexArgs() throws Exception {
int arg1 = 1;
String arg2 = "arg2";
@ -715,7 +680,7 @@ public class ReferenceServiceTestCase {
argsContent[0] = structuredTuple;
argsContent[1] = list;
OtpErlangTuple args = new OtpErlangTuple(argsContent);
refMbox.send("passComplexArgs", "RPCServer", args);
refMbox.send("passComplexArgs", "RPCServerMbox", args);
OtpErlangObject result = refMbox.receiveMsg().getMsg();
assertEquals(arg1,
((OtpErlangLong) ((OtpErlangTuple) ((OtpErlangTuple) result)
@ -747,7 +712,7 @@ public class ReferenceServiceTestCase {
try {
// timeout exception expected
timeoutMboxReference.sendArgs("");
fail();
fail("Exception expected");
} catch (Exception e) {
assertEquals(ErlangException.class, e.getClass());
assertEquals(e.getCause().getClass(), InterruptedException.class);
@ -791,7 +756,7 @@ public class ReferenceServiceTestCase {
// exception, so expecting one
try {
timeoutModuleReference.sayHellos();
fail();
fail("Exception expected");
} catch (Exception e) {
assertEquals(ErlangException.class, e.getClass());
}
@ -801,19 +766,51 @@ public class ReferenceServiceTestCase {
}
/**
* Tests timeout feature for reference binding RPC
* Tests timeout feature for service side bindings
* @throws Exception
*/
@Test(timeout = 4000)
public void testServiceTimeouts() throws Exception {
OtpSelf self = new OtpSelf("tmp_connector_"
+ System.currentTimeMillis());
OtpPeer peer = new OtpPeer("RPCServerTimeout");
OtpConnection connection = self.connect(peer);
// delay message sending after connecting
Thread.sleep(1000);
// service binding timeout set to 500 so after that time it will give up
// and close connection
try {
connection.send("rex", new OtpErlangString("test"));
fail("Exception expected");
} catch (Exception e) {
assertEquals(IOException.class, e.getClass());
}
connection = self.connect(peer);
// sending message immediately and encountering no connection close
connection.send("rex", new OtpErlangString("test"));
}
/**
* Tests cookie feature for both reference and service bindings RPC
*
* @throws Exception
*/
@Test(timeout = 1000)
public void testReferenceCookies() throws Exception {
// testing wrong cookie
try {
cookieModuleReference.sayHellos();
fail();
invalidCookieModuleReference.sayHellos();
fail("Exception expected");
} catch (Exception e) {
assertEquals(ErlangException.class, e.getClass());
assertEquals(OtpAuthException.class, e.getCause().getClass());
}
}
// testing correct cookie
cookieModuleReference.sayHellos();
}
}

View file

@ -31,6 +31,7 @@ public class ReferenceTestComponentImpl implements ReferenceTestComponent {
private MboxInterface cookieMboxReference;
private ServiceInterface moduleReference;
private ServiceInterface cookieModuleReference;
private ServiceInterface invalidCookieModuleReference;
private ServiceInterface timeoutModuleReference;
private ServiceInterface clonedModuleReference;
@ -59,6 +60,11 @@ public class ReferenceTestComponentImpl implements ReferenceTestComponent {
this.cookieModuleReference = cookieModuleReference;
}
@Reference
public void setInvalidCookieModuleReference(ServiceInterface invalidCookieModuleReference) {
this.invalidCookieModuleReference = invalidCookieModuleReference;
}
@Reference
public void setTimeoutModuleReference(ServiceInterface timeoutModuleReference) {
this.timeoutModuleReference = timeoutModuleReference;
@ -88,6 +94,10 @@ public class ReferenceTestComponentImpl implements ReferenceTestComponent {
public ServiceInterface getCookieModuleReference() {
return cookieModuleReference;
}
public ServiceInterface getInvalidCookieModuleReference() {
return invalidCookieModuleReference;
}
public ServiceInterface getTimeoutModuleReference() {
return timeoutModuleReference;

View file

@ -14,8 +14,8 @@
<tuscany:binding.erlang node="MboxServer" mbox="true" timeout="500"/>
</reference>
<reference name="timeoutMboxReference">
<tuscany:binding.erlang node="MboxServer" mbox="true" timeout="500"/>
<reference name="cookieMboxReference">
<tuscany:binding.erlang node="MboxServer" mbox="true" cookie="cookie"/>
</reference>
<reference name="moduleReference">
@ -23,16 +23,17 @@
</reference>
<reference name="cookieModuleReference">
<tuscany:binding.erlang node="RPCServer" module="hello" cookie="cookie"/>
<tuscany:binding.erlang node="RPCServerCookie" module="hello_cookie" cookie="cookie"/>
</reference>
<reference name="invalidCookieModuleReference">
<tuscany:binding.erlang node="RPCServerCookie" module="hello_cookie" cookie="invalid_cookie"/>
</reference>
<reference name="timeoutModuleReference">
<tuscany:binding.erlang node="RPCServer" module="hello_timeout" timeout="500"/>
<tuscany:binding.erlang node="RPCServerTimeout" module="hello_timeout" timeout="500"/>
</reference>
<reference name="clonedModuleReference">
<tuscany:binding.erlang node="RPCServer" module="hello_clone"/>
</reference>
</component>
</composite>

View file

@ -13,15 +13,7 @@
</service>
<service name="ServiceTestMbox" promote="ServiceTest">
<interface.java interface="org.apache.tuscany.sca.binding.erlang.testing.ServiceTestComponent" />
<tuscany:binding.erlang node="RPCServer" mbox="true"/>
</service>
<component name="ServiceTestClone">
<implementation.java class="org.apache.tuscany.sca.binding.erlang.testing.ServiceTestComponentImplClone" />
</component>
<service name="ServiceTestClone" promote="ServiceTestClone">
<interface.java interface="org.apache.tuscany.sca.binding.erlang.testing.ServiceTestComponent" />
<tuscany:binding.erlang node="RPCServer" module="hello_clone"/>
<tuscany:binding.erlang node="RPCServerMbox" mbox="true"/>
</service>
<component name="ServiceTestTimeout">
@ -29,7 +21,15 @@
</component>
<service name="ServiceTestTimeout" promote="ServiceTestTimeout">
<interface.java interface="org.apache.tuscany.sca.binding.erlang.testing.ServiceTestComponent" />
<tuscany:binding.erlang node="RPCServer" module="hello_timeout"/>
<tuscany:binding.erlang node="RPCServerTimeout" module="hello_timeout" timeout="500"/>
</service>
<component name="ServiceTestCookie">
<implementation.java class="org.apache.tuscany.sca.binding.erlang.testing.ServiceTestComponentImpl" />
</component>
<service name="ServiceTestCookie" promote="ServiceTestCookie">
<interface.java interface="org.apache.tuscany.sca.binding.erlang.testing.ServiceTestComponent" />
<tuscany:binding.erlang node="RPCServerCookie" module="hello_cookie" cookie="cookie"/>
</service>
</composite>

View file

@ -1,23 +0,0 @@
<composite xmlns="http://www.osoa.org/xmlns/sca/1.0"
xmlns:tuscany="http://tuscany.apache.org/xmlns/sca/1.0"
targetNamespace="http://sample"
xmlns:sample="http://sample"
name="ErlangServiceBinding">
<component name="ServiceTest">
<implementation.java class="org.apache.tuscany.sca.binding.erlang.testing.ServiceTestComponentImpl" />
</component>
<service name="ServiceTest" promote="ServiceTest">
<interface.java interface="org.apache.tuscany.sca.binding.erlang.testing.ServiceTestComponent" />
<tuscany:binding.erlang node="DuplicateTest" module="hello"/>
</service>
<component name="ServiceTestClone">
<implementation.java class="org.apache.tuscany.sca.binding.erlang.testing.ServiceTestComponentImplClone" />
</component>
<service name="ServiceTestClone" promote="ServiceTestClone">
<interface.java interface="org.apache.tuscany.sca.binding.erlang.testing.ServiceTestComponent" />
<tuscany:binding.erlang node="DuplicateTest" module="hello"/>
</service>
</composite>

View file

@ -29,6 +29,8 @@ import org.apache.tuscany.sca.assembly.xml.Constants;
*/
public interface ErlangBinding extends Binding {
final long NO_TIMEOUT = 0;
QName BINDING_ERLANG_QNAME = new QName(Constants.SCA10_TUSCANY_NS, "binding.erlang");
String getNode();
@ -50,5 +52,9 @@ public interface ErlangBinding extends Binding {
String getCookie();
void setCookie(String cookie);
boolean hasTimeout();
boolean hasCookie();
}

View file

@ -33,8 +33,6 @@ import org.apache.tuscany.sca.policy.PolicySetAttachPoint;
*/
public class ErlangBindingImpl implements ErlangBinding, PolicySetAttachPoint {
public static final long DEFAULT_TIMEOUT = 10000;
private String node;
private String module;
private boolean mbox;
@ -44,7 +42,7 @@ public class ErlangBindingImpl implements ErlangBinding, PolicySetAttachPoint {
private List<PolicySet> policySets = new ArrayList<PolicySet>();
private IntentAttachPointType intentAttachPointType;
private List<PolicySet> applicablePolicySets = new ArrayList<PolicySet>();
private long timeout = DEFAULT_TIMEOUT;
private long timeout = NO_TIMEOUT;
public String getNode() {
return node;
@ -121,12 +119,8 @@ public class ErlangBindingImpl implements ErlangBinding, PolicySetAttachPoint {
}
public void setTimeout(long timeout) {
// NOTE: 0 timeout will cause setting to default
if (timeout == 0) {
this.timeout = DEFAULT_TIMEOUT;
} else {
this.timeout = timeout;
}
// NOTE: not setting timeout or setting it to 0 will cause no timeout
this.timeout = timeout;
}
public String getCookie() {
@ -137,4 +131,12 @@ public class ErlangBindingImpl implements ErlangBinding, PolicySetAttachPoint {
this.cookie = cookie;
}
public boolean hasTimeout() {
return timeout != ErlangBinding.NO_TIMEOUT;
}
public boolean hasCookie() {
return cookie != null && cookie.length() > 0;
}
}

View file

@ -65,16 +65,6 @@ public class ErlangBindingProcessorTestCase {
+ " </service>"
+ " </component>"
+ "</composite>";
private static final String COMPOSITE_ZERO_TIMEOUT =
"<?xml version=\"1.0\" encoding=\"ASCII\"?>" + "<composite xmlns=\"http://www.osoa.org/xmlns/sca/1.0\" xmlns:tuscany=\"http://tuscany.apache.org/xmlns/sca/1.0\" targetNamespace=\"http://binding-erlang\" name=\"binding-erlang\">"
+ " <component name=\"HelloWorldComponent\">"
+ " <implementation.java class=\"services.HelloWorld\"/>"
+ " <service name=\"HelloWorldService\">"
+ " <tuscany:binding.erlang node=\"SomeNode\" timeout=\"0\"/>"
+ " </service>"
+ " </component>"
+ "</composite>";
private static XMLInputFactory inputFactory;
private static StAXArtifactProcessor<Object> staxProcessor;
@ -110,6 +100,8 @@ public class ErlangBindingProcessorTestCase {
assertEquals("SomeNode", binding.getNode());
assertEquals(1000, binding.getTimeout());
assertEquals("cookie", binding.getCookie());
assertEquals(true, binding.hasTimeout());
assertEquals(true, binding.hasCookie());
}
/**
@ -123,22 +115,10 @@ public class ErlangBindingProcessorTestCase {
Composite composite = (Composite)staxProcessor.read(reader);
ErlangBinding binding =
(ErlangBinding)composite.getComponents().get(0).getServices().get(0).getBindings().get(0);
assertEquals(ErlangBindingImpl.DEFAULT_TIMEOUT, binding.getTimeout());
assertEquals(ErlangBindingImpl.NO_TIMEOUT, binding.getTimeout());
assertEquals(null, binding.getCookie());
}
/**
* Tests "resultTimeout" attribute set to 0
*
* @throws Exception
*/
@Test
public void testLoadZeroTimeout() throws Exception {
XMLStreamReader reader = inputFactory.createXMLStreamReader(new StringReader(COMPOSITE_ZERO_TIMEOUT));
Composite composite = (Composite)staxProcessor.read(reader);
ErlangBinding binding =
(ErlangBinding)composite.getComponents().get(0).getServices().get(0).getBindings().get(0);
assertEquals(ErlangBindingImpl.DEFAULT_TIMEOUT, binding.getTimeout());
assertEquals(false, binding.hasTimeout());
assertEquals(false, binding.hasCookie());
}
}