diff options
Diffstat (limited to 'sca-java-2.x/trunk/modules/endpoint-tribes/src')
2 files changed, 131 insertions, 1 deletions
diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java index ef002d5230..9dd98dedfb 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java @@ -38,11 +38,13 @@ import java.util.logging.Logger; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.GroupChannel; import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor; import org.apache.catalina.tribes.membership.McastService; import org.apache.catalina.tribes.membership.StaticMember; +import org.apache.catalina.tribes.transport.ReceiverBase; import org.apache.tuscany.sca.assembly.Endpoint; import org.apache.tuscany.sca.assembly.EndpointReference; import org.apache.tuscany.sca.core.ExtensionPointRegistry; @@ -76,6 +78,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi private static List<URI> staticRoutes; private String id; + private boolean noMultiCast; private static final Channel createChannel(String address, int port, String bindAddress) { @@ -168,6 +171,10 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi staticRoutes.add(URI.create("tcp://" + st.nextToken())); } } + String mcast = attributes.get("nomcast"); + if (mcast != null) { + noMultiCast = Boolean.valueOf(mcast); + } } public void start() { @@ -179,6 +186,10 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi new ClassLoader[] {ReplicatedEndpointRegistry.class.getClassLoader()}); map.addListener(this); + if (noMultiCast) { + map.getChannel().addInterceptor(new DisableMcastInterceptor()); + } + if (staticRoutes != null) { StaticMembershipInterceptor smi = new StaticMembershipInterceptor(); for (URI staticRoute : staticRoutes) { @@ -194,7 +205,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi smi.setLocalMember(map.getChannel().getLocalMember(false)); map.getChannel().addInterceptor(smi); } - + try { map.getChannel().start(Channel.DEFAULT); } catch (ChannelException e) { @@ -398,6 +409,11 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi mcastService.setPort(MULTICAST_PORT); mcastService.setAddress(MULTICAST_ADDRESS); + +// ChannelReceiver rcv = channel.getChannelReceiver(); +// ReceiverBase rcvb = (ReceiverBase)rcv; +// rcvb.setPort(10480); + InetAddress localhost = InetAddress.getLocalHost(); // REVIEW: In my case, there are multiple IP addresses diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java new file mode 100644 index 0000000000..95d407c106 --- /dev/null +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java @@ -0,0 +1,114 @@ +/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.tribes;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.tuscany.sca.assembly.AssemblyFactory;
+import org.apache.tuscany.sca.assembly.Binding;
+import org.apache.tuscany.sca.assembly.Component;
+import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.assembly.impl.SCABindingFactoryImpl;
+import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+// Ignore so its not run in the build yet till its working
+@Ignore
+public class MultiRegTestCase {
+
+// @Test
+// public void testTwoNodesMultiCast() throws InterruptedException {
+// DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry();
+// FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
+// AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class);
+//
+// ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
+// reg1.start();
+//
+// Endpoint ep1 = assemblyFactory.createEndpoint();
+// Component comp = assemblyFactory.createComponent();
+// ep1.setComponent(comp);
+// ep1.setService(assemblyFactory.createComponentService());
+// Binding b = new SCABindingFactoryImpl().createSCABinding();
+// ep1.setBinding(b);
+// ep1.setURI("ep1uri");
+// reg1.addEndpoint(ep1);
+//
+// Endpoint ep1p = reg1.getEndpoint("ep1uri");
+// Assert.assertNotNull(ep1p);
+// Assert.assertEquals("ep1uri", ep1p.getURI());
+//
+// ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
+// reg2.start();
+// Thread.sleep(5000);
+//
+// Endpoint ep1p2 = reg2.getEndpoint("ep1uri");
+// Assert.assertNotNull(ep1p2);
+// Assert.assertEquals("ep1uri", ep1p2.getURI());
+//
+// reg1.stop();
+// reg2.stop();
+// }
+
+ @Test
+ public void testTwoNodesStaticNoMultiCast() throws InterruptedException {
+ DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry();
+ FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
+ AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class);
+
+ Map<String, String> attrs1 = new HashMap<String, String>();
+ attrs1.put("nomcast", "true");
+ attrs1.put("routes", "9.167.197.91:4001");
+ ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, attrs1, "foo", "bar");
+ reg1.start();
+
+ Endpoint ep1 = assemblyFactory.createEndpoint();
+ Component comp = assemblyFactory.createComponent();
+ ep1.setComponent(comp);
+ ep1.setService(assemblyFactory.createComponentService());
+ Binding b = new SCABindingFactoryImpl().createSCABinding();
+ ep1.setBinding(b);
+ ep1.setURI("ep1uri");
+ reg1.addEndpoint(ep1);
+
+ Endpoint ep1p = reg1.getEndpoint("ep1uri");
+ Assert.assertNotNull(ep1p);
+ Assert.assertEquals("ep1uri", ep1p.getURI());
+
+ Map<String, String> attrs2 = new HashMap<String, String>();
+ attrs2.put("nomcast", "true");
+ attrs2.put("routes", "9.167.197.91:4000");
+ ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, attrs2, "foo", "bar");
+ reg2.start();
+ Thread.sleep(15000);
+
+ Endpoint ep1p2 = reg2.getEndpoint("ep1uri");
+ Assert.assertNotNull(ep1p2);
+ Assert.assertEquals("ep1uri", ep1p2.getURI());
+
+ reg1.stop();
+ reg2.stop();
+ }
+
+}
|