diff options
Diffstat (limited to '')
4 files changed, 200 insertions, 101 deletions
diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java index 47b3450594..fe683af025 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java @@ -708,7 +708,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb // [rfeng] Change the behavior to replicate to all nodes if (entry.isPrimary() && self.equals(entry.getPrimary())) { try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); entry.setPrimary(self); } catch (ChannelException x) { @@ -768,7 +768,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb if (log.isDebugEnabled()) log.debug("[1] Primary choosing a new backup"); try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); entry.setPrimary(channel.getLocalMember(false)); } catch (ChannelException x) { @@ -798,7 +798,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb entry.setPrimary(channel.getLocalMember(false)); entry.setBackup(false); entry.setProxy(false); - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); if (mapOwner != null) mapOwner.objectMadePrimay(entry.getKey(), entry.getValue()); @@ -833,7 +833,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb return members[node]; } - protected abstract Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException; + protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException; public void heartbeat() { try { @@ -916,7 +916,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb } if (entry.isBackup()) { //select a new backup node - backup = publishEntryInfo(key, entry.getValue(), entry.getBackupNodes()); + backup = publishEntryInfo(key, entry.getValue()); } else if (entry.isProxy()) { //invalidate the previous primary msg = @@ -997,7 +997,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb old = remove(key); try { if (notify) { - Member[] backup = publishEntryInfo(key, value, entry.getBackupNodes()); + Member[] backup = publishEntryInfo(key, value); entry.setBackupNodes(backup); } } catch (ChannelException x) { diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java index 0c2144ea49..42d2eda6a3 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java @@ -38,11 +38,13 @@ import java.util.logging.Logger; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.GroupChannel; import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor; import org.apache.catalina.tribes.membership.McastService; import org.apache.catalina.tribes.membership.StaticMember; +import org.apache.catalina.tribes.transport.ReceiverBase; import org.apache.tuscany.sca.assembly.Endpoint; import org.apache.tuscany.sca.assembly.EndpointReference; import org.apache.tuscany.sca.core.ExtensionPointRegistry; @@ -67,6 +69,9 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi private String address = MULTICAST_ADDRESS; private String bind = null; private int timeout = 50; + private int receiverPort = 4000; + private int receiverAutoBind = 100; + private List<URI> staticRoutes; private final static String DEFAULT_DOMAIN_URI = "http://tuscany.apache.org/sca/1.1/domains/default"; private String domainURI = DEFAULT_DOMAIN_URI; @@ -75,7 +80,6 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi private ExtensionPointRegistry registry; private ReplicatedMap map; - private static List<URI> staticRoutes; private String id; private boolean noMultiCast; @@ -175,6 +179,14 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi if (mcast != null) { noMultiCast = Boolean.valueOf(mcast); } + String recvPort = attributes.get("receiverPort"); + if (recvPort != null) { + receiverPort = Integer.parseInt(recvPort); + } + String recvAutoBind = attributes.get("receiverAutoBind"); + if (recvAutoBind != null) { + receiverAutoBind = Integer.parseInt(recvAutoBind); + } } public void start() { @@ -190,9 +202,24 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi if (noMultiCast) { map.getChannel().addInterceptor(new DisableMcastInterceptor()); } - - // http://www.mail-archive.com/users@tomcat.apache.org/msg24873.html - int port = channel.getChannelReceiver().getPort(); + + // Configure the receiver ports + ChannelReceiver receiver = channel.getChannelReceiver(); + if (receiver instanceof ReceiverBase) { + ((ReceiverBase)receiver).setAutoBind(receiverAutoBind); + ((ReceiverBase)receiver).setPort(receiverPort); + } + + /* + Object sender = channel.getChannelSender(); + if (sender instanceof ReplicationTransmitter) { + sender = ((ReplicationTransmitter)sender).getTransport(); + } + if (sender instanceof AbstractSender) { + ((AbstractSender)sender).setKeepAliveCount(0); + ((AbstractSender)sender).setMaxRetryAttempts(5); + } + */ if (staticRoutes != null) { StaticMembershipInterceptor smi = new StaticMembershipInterceptor(); @@ -200,12 +227,12 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi Member member; try { // The port has to match the receiver port - member = new StaticMember(staticRoute.getHost(), port, 5000); + member = new StaticMember(staticRoute.getHost(), staticRoute.getPort(), 5000); } catch (IOException e) { throw new RuntimeException(e); } smi.addStaticMember(member); - logger.info("Added static route: " + staticRoute.getHost() + ":" + port); + logger.info("Added static route: " + staticRoute.getHost() + ":" + staticRoute.getPort()); } smi.setLocalMember(map.getChannel().getLocalMember(false)); map.getChannel().addInterceptor(smi); @@ -410,6 +437,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi logger.info(id + " Remote endpoint removed: " + entry.getValue()); } Endpoint oldEp = (Endpoint)entry.getValue(); + ((RuntimeEndpoint) oldEp).bind(registry, this); for (EndpointListener listener : listeners) { listener.endpointRemoved(oldEp); } diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java index 762407604d..669ad82192 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java @@ -17,6 +17,8 @@ package org.apache.tuscany.sca.endpoint.tribes; import java.io.Serializable; +import java.util.Iterator; +import java.util.Map; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; @@ -97,29 +99,65 @@ public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, * @return Member - the backup node * @throws ChannelException */ - protected Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException { + protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { if (!(key instanceof Serializable && value instanceof Serializable)) return new Member[0]; //select a backup node - Member[] backup = getMapMembers(); + Member[] members = getMapMembers(); - if (backup == null || backup.length == 0) - return null; - - // Set the receivers to these members that are not in the backup nodes yet - Member[] members = backup; - if (backupNodes != null) { - members = getMapMembersExcl(backupNodes); + if (members == null || members.length == 0) { + return new Member[0]; } //publish the data out to all nodes MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value, - null, channel.getLocalMember(false), backup); + null, channel.getLocalMember(false), members); getChannel().send(members, msg, getChannelSendOptions()); - return backup; + return members; + } + + /** + * Override the base method to look up existing entries only + */ + public Object get(Object key) { + MapEntry entry = super.getInternal(key); + if (log.isTraceEnabled()) + log.trace("Requesting id:" + key + " entry:" + entry); + if (entry == null) { + return null; + } + return entry.getValue(); } + /** + * Override the base method to remove all entries owned by the member that disappeared + */ + public void memberDisappeared(Member member) { + boolean removed = false; + synchronized (mapMembers) { + removed = (mapMembers.remove(member) != null); + if (!removed) { + if (log.isDebugEnabled()) + log.debug("Member[" + member + "] disappeared, but was not present in the map."); + return; //the member was not part of our map. + } + } + + Iterator<Map.Entry<Object, Object>> i = super.entrySetFull().iterator(); + while (i.hasNext()) { + Map.Entry<Object, Object> e = i.next(); + MapEntry entry = (MapEntry)super.getInternal(e.getKey()); + if (entry == null) { + continue; + } + if (member.equals(entry.getPrimary())) { + if (log.isDebugEnabled()) + log.debug("[2] Primary disappeared"); + i.remove(); + } //end if + } //while + } } diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java index d329ebd066..a470c47ba0 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java @@ -19,6 +19,7 @@ package org.apache.tuscany.sca.endpoint.tribes;
+import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
@@ -28,109 +29,141 @@ import org.apache.tuscany.sca.assembly.Component; import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.assembly.SCABindingFactory;
import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.runtime.EndpointListener;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.junit.Assert;
-import org.junit.Ignore;
+import org.junit.BeforeClass;
import org.junit.Test;
// Ignore so its not run in the build yet till its working
-@Ignore
-public class MultiRegTestCase {
-
-// @Test
-// public void testTwoNodesMultiCast() throws InterruptedException {
-// DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry();
-// FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
-// AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class);
-//
-// ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
-// reg1.start();
-//
-// Endpoint ep1 = assemblyFactory.createEndpoint();
-// Component comp = assemblyFactory.createComponent();
-// ep1.setComponent(comp);
-// ep1.setService(assemblyFactory.createComponentService());
-// Binding b = new SCABindingFactoryImpl().createSCABinding();
-// ep1.setBinding(b);
-// ep1.setURI("ep1uri");
-// reg1.addEndpoint(ep1);
-//
-// Endpoint ep1p = reg1.getEndpoint("ep1uri");
-// Assert.assertNotNull(ep1p);
-// Assert.assertEquals("ep1uri", ep1p.getURI());
-//
-// ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
-// reg2.start();
-// Thread.sleep(5000);
-//
-// Endpoint ep1p2 = reg2.getEndpoint("ep1uri");
-// Assert.assertNotNull(ep1p2);
-// Assert.assertEquals("ep1uri", ep1p2.getURI());
-//
-// reg1.stop();
-// reg2.stop();
-// }
+public class MultiRegTestCase implements EndpointListener {
+ private static ExtensionPointRegistry extensionPoints;
+ private static AssemblyFactory assemblyFactory;
+ private static SCABindingFactory scaBindingFactory;
- @Test
- public void testTwoNodesStaticNoMultiCast() throws InterruptedException {
- DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry();
+ @BeforeClass
+ public static void init() {
+ extensionPoints = new DefaultExtensionPointRegistry();
FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
- AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class);
+ assemblyFactory = factories.getFactory(AssemblyFactory.class);
+ scaBindingFactory = factories.getFactory(SCABindingFactory.class);
+ }
+
+ @Test
+ public void testReplication() throws Exception {
+ RuntimeEndpoint ep1 = createEndpoint("ep1uri");
+
+ String host = InetAddress.getLocalHost().getHostAddress();
+ String bind = null; // "9.65.158.31";
+ String port1 = "8085";
+ String port2 = "8086";
+ String port3 = "8087";
+ String range = "1";
Map<String, String> attrs1 = new HashMap<String, String>();
- attrs1.put("nomcast", "true");
- attrs1.put("routes", "9.167.197.91:4001 9.167.197.91:4002");
+ // attrs1.put("nomcast", "true");
+ attrs1.put("bind", bind);
+ attrs1.put("receiverPort", port1);
+ attrs1.put("receiverAutoBind", range);
+ // attrs1.put("routes", host + ":" + port2 + " " + host + ":" + port3);
ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, attrs1, "foo", "bar");
+ reg1.addListener(this);
reg1.start();
- Endpoint ep1 = assemblyFactory.createEndpoint();
- Component comp = assemblyFactory.createComponent();
- ep1.setComponent(comp);
- ep1.setService(assemblyFactory.createComponentService());
- Binding b = factories.getFactory(SCABindingFactory.class).createSCABinding();
- ep1.setBinding(b);
- ep1.setURI("ep1uri");
- reg1.addEndpoint(ep1);
-
- Endpoint ep1p = reg1.getEndpoint("ep1uri");
- Assert.assertNotNull(ep1p);
- Assert.assertEquals("ep1uri", ep1p.getURI());
-
Map<String, String> attrs2 = new HashMap<String, String>();
- attrs2.put("nomcast", "true");
- attrs2.put("routes", "9.167.197.91:4000");
+ // attrs2.put("nomcast", "true");
+ attrs1.put("bind", bind);
+ attrs2.put("receiverPort", port2);
+ attrs2.put("receiverAutoBind", range);
+ // attrs2.put("routes", host + ":"+port1);
ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, attrs2, "foo", "bar");
+ reg2.addListener(this);
reg2.start();
-
- System.out.println("wait");
- Thread.sleep(10000);
- System.out.println("run");
-
- Endpoint ep1p2 = reg2.getEndpoint("ep1uri");
- Assert.assertNotNull(ep1p2);
- Assert.assertEquals("ep1uri", ep1p2.getURI());
Map<String, String> attrs3 = new HashMap<String, String>();
- attrs3.put("nomcast", "true");
- attrs3.put("routes", "9.167.197.91:4000");
+ // attrs3.put("nomcast", "true");
+ attrs1.put("bind", bind);
+ attrs3.put("receiverPort", port3);
+ attrs3.put("receiverAutoBind", range);
+ // attrs3.put("routes", host + ":"+port1);
ReplicatedEndpointRegistry reg3 = new ReplicatedEndpointRegistry(extensionPoints, attrs3, "foo", "bar");
+ reg3.addListener(this);
reg3.start();
-
- System.out.println("wait");
- Thread.sleep(5000);
- System.out.println("run");
- Endpoint ep1p3 = reg3.getEndpoint("ep1uri");
- Assert.assertNotNull(ep1p3);
- Assert.assertEquals("ep1uri", ep1p3.getURI());
+ ep1.bind(extensionPoints, reg1);
+ reg1.addEndpoint(ep1);
+ assertExists(reg1, "ep1uri");
+ assertExists(reg2, "ep1uri");
+ assertExists(reg3, "ep1uri");
+
+ RuntimeEndpoint ep2 = createEndpoint("ep2uri");
+ ep2.bind(extensionPoints, reg2);
+ reg2.addEndpoint(ep2);
+ assertExists(reg2, "ep2uri");
+ assertExists(reg1, "ep2uri");
+ assertExists(reg3, "ep2uri");
+ reg1.stop();
+ Thread.sleep(6000);
+ Assert.assertNull(reg2.getEndpoint("ep1uri"));
+ Assert.assertNull(reg3.getEndpoint("ep1uri"));
+ assertExists(reg2, "ep2uri");
+ assertExists(reg3, "ep2uri");
+
+ reg1.start();
+ ep1.bind(extensionPoints, reg1);
+ reg1.addEndpoint(ep1);
+ assertExists(reg1, "ep1uri");
+ assertExists(reg2, "ep1uri");
+ assertExists(reg3, "ep1uri");
- System.out.println("wait2");
- Thread.sleep(5000);
- System.out.println("end");
reg1.stop();
reg2.stop();
reg3.stop();
+ System.out.println(); // closed
+ }
+
+ private Endpoint assertExists(ReplicatedEndpointRegistry reg, String uri) throws InterruptedException {
+ Endpoint ep = null;
+ int count = 0;
+ while (ep == null && count < 15) {
+ ep = reg.getEndpoint(uri);
+ Thread.sleep(1000);
+ count++;
+ System.out.println(reg + ": tries=" + count);
+ }
+ Assert.assertNotNull(ep);
+ Assert.assertEquals(uri, ep.getURI());
+ return ep;
+ }
+
+ private RuntimeEndpoint createEndpoint(String uri) {
+ RuntimeEndpoint ep = (RuntimeEndpoint) assemblyFactory.createEndpoint();
+ Component comp = assemblyFactory.createComponent();
+ ep.setComponent(comp);
+ ep.setService(assemblyFactory.createComponentService());
+ Binding b = scaBindingFactory.createSCABinding();
+ ep.setBinding(b);
+ ep.setURI(uri);
+ return ep;
+ }
+
+ private void print(String prefix, Endpoint ep) {
+ System.out.println(prefix + ": "+ep);
+ }
+
+ public void endpointAdded(Endpoint endpoint) {
+ print("Added", endpoint);
+ }
+
+ public void endpointRemoved(Endpoint endpoint) {
+ print("Removed", endpoint);
+ }
+
+ public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) {
+ print("Updated", newEndpoint);
}
}
|