From 0e22d32a94dacd6e9e59bf69302d783a5f222f6c Mon Sep 17 00:00:00 2001 From: antelder Date: Tue, 8 Dec 2009 12:39:15 +0000 Subject: Add a quick hack to enable disabling multicast. Will need to clean up all the registry configurations once its working more properly git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@888376 13f79535-47bb-0310-9956-ffa450edef68 --- .../tribes/ReplicatedEndpointRegistry.java | 18 +++- .../sca/endpoint/tribes/MultiRegTestCase.java | 114 +++++++++++++++++++++ 2 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java (limited to 'sca-java-2.x/trunk/modules/endpoint-tribes/src') diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java index ef002d5230..9dd98dedfb 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java @@ -38,11 +38,13 @@ import java.util.logging.Logger; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.GroupChannel; import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor; import org.apache.catalina.tribes.membership.McastService; import org.apache.catalina.tribes.membership.StaticMember; +import org.apache.catalina.tribes.transport.ReceiverBase; import org.apache.tuscany.sca.assembly.Endpoint; import org.apache.tuscany.sca.assembly.EndpointReference; import org.apache.tuscany.sca.core.ExtensionPointRegistry; @@ -76,6 +78,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi private static List staticRoutes; private String id; + private boolean noMultiCast; private static final Channel createChannel(String address, int port, String bindAddress) { @@ -168,6 +171,10 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi staticRoutes.add(URI.create("tcp://" + st.nextToken())); } } + String mcast = attributes.get("nomcast"); + if (mcast != null) { + noMultiCast = Boolean.valueOf(mcast); + } } public void start() { @@ -179,6 +186,10 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi new ClassLoader[] {ReplicatedEndpointRegistry.class.getClassLoader()}); map.addListener(this); + if (noMultiCast) { + map.getChannel().addInterceptor(new DisableMcastInterceptor()); + } + if (staticRoutes != null) { StaticMembershipInterceptor smi = new StaticMembershipInterceptor(); for (URI staticRoute : staticRoutes) { @@ -194,7 +205,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi smi.setLocalMember(map.getChannel().getLocalMember(false)); map.getChannel().addInterceptor(smi); } - + try { map.getChannel().start(Channel.DEFAULT); } catch (ChannelException e) { @@ -398,6 +409,11 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi mcastService.setPort(MULTICAST_PORT); mcastService.setAddress(MULTICAST_ADDRESS); + +// ChannelReceiver rcv = channel.getChannelReceiver(); +// ReceiverBase rcvb = (ReceiverBase)rcv; +// rcvb.setPort(10480); + InetAddress localhost = InetAddress.getLocalHost(); // REVIEW: In my case, there are multiple IP addresses diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java new file mode 100644 index 0000000000..95d407c106 --- /dev/null +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.tribes; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.tuscany.sca.assembly.AssemblyFactory; +import org.apache.tuscany.sca.assembly.Binding; +import org.apache.tuscany.sca.assembly.Component; +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.impl.SCABindingFactoryImpl; +import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +// Ignore so its not run in the build yet till its working +@Ignore +public class MultiRegTestCase { + +// @Test +// public void testTwoNodesMultiCast() throws InterruptedException { +// DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry(); +// FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class); +// AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class); +// +// ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar"); +// reg1.start(); +// +// Endpoint ep1 = assemblyFactory.createEndpoint(); +// Component comp = assemblyFactory.createComponent(); +// ep1.setComponent(comp); +// ep1.setService(assemblyFactory.createComponentService()); +// Binding b = new SCABindingFactoryImpl().createSCABinding(); +// ep1.setBinding(b); +// ep1.setURI("ep1uri"); +// reg1.addEndpoint(ep1); +// +// Endpoint ep1p = reg1.getEndpoint("ep1uri"); +// Assert.assertNotNull(ep1p); +// Assert.assertEquals("ep1uri", ep1p.getURI()); +// +// ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar"); +// reg2.start(); +// Thread.sleep(5000); +// +// Endpoint ep1p2 = reg2.getEndpoint("ep1uri"); +// Assert.assertNotNull(ep1p2); +// Assert.assertEquals("ep1uri", ep1p2.getURI()); +// +// reg1.stop(); +// reg2.stop(); +// } + + @Test + public void testTwoNodesStaticNoMultiCast() throws InterruptedException { + DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry(); + FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class); + AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class); + + Map attrs1 = new HashMap(); + attrs1.put("nomcast", "true"); + attrs1.put("routes", "9.167.197.91:4001"); + ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, attrs1, "foo", "bar"); + reg1.start(); + + Endpoint ep1 = assemblyFactory.createEndpoint(); + Component comp = assemblyFactory.createComponent(); + ep1.setComponent(comp); + ep1.setService(assemblyFactory.createComponentService()); + Binding b = new SCABindingFactoryImpl().createSCABinding(); + ep1.setBinding(b); + ep1.setURI("ep1uri"); + reg1.addEndpoint(ep1); + + Endpoint ep1p = reg1.getEndpoint("ep1uri"); + Assert.assertNotNull(ep1p); + Assert.assertEquals("ep1uri", ep1p.getURI()); + + Map attrs2 = new HashMap(); + attrs2.put("nomcast", "true"); + attrs2.put("routes", "9.167.197.91:4000"); + ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, attrs2, "foo", "bar"); + reg2.start(); + Thread.sleep(15000); + + Endpoint ep1p2 = reg2.getEndpoint("ep1uri"); + Assert.assertNotNull(ep1p2); + Assert.assertEquals("ep1uri", ep1p2.getURI()); + + reg1.stop(); + reg2.stop(); + } + +} -- cgit v1.2.3