diff options
author | rfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68> | 2009-10-12 19:31:48 +0000 |
---|---|---|
committer | rfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68> | 2009-10-12 19:31:48 +0000 |
commit | d3937102e3ee45a413aaf1f3aa19020d23d8b721 (patch) | |
tree | 80dda4f00db4110db7747147d7aadb6e0ce8e740 /java/sca/modules | |
parent | f1677566975979192ad0aafaba06f2d1fd1307bf (diff) |
Make sure that entries are replicated to all nodes (not just at least a backup node)
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@824468 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/sca/modules')
-rw-r--r-- | java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java | 21 | ||||
-rw-r--r-- | java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java | 16 | ||||
-rw-r--r-- | java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java | 10 | ||||
-rw-r--r-- | java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java (renamed from java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/Test.java) | 48 |
4 files changed, 62 insertions, 33 deletions
diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java index 91aee4d585..98c6739854 100644 --- a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java +++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java @@ -685,7 +685,8 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb } public void mapMemberAdded(Member member) { - if (member.equals(getChannel().getLocalMember(false))) + Member self = getChannel().getLocalMember(false); + if (member.equals(self)) return; boolean memberAdded = false; //select a backup node if we don't have one @@ -703,11 +704,13 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb MapEntry entry = (MapEntry)super.get(e.getKey()); if (entry == null) continue; - if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { + // if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { + // [rfeng] Change the behavior to replicate to all nodes + if (entry.isPrimary() && self.equals(entry.getPrimary())) { try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); entry.setBackupNodes(backup); - entry.setPrimary(channel.getLocalMember(false)); + entry.setPrimary(self); } catch (ChannelException x) { log.error("Unable to select backup node.", x); } //catch @@ -765,7 +768,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb if (log.isDebugEnabled()) log.debug("[1] Primary choosing a new backup"); try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); entry.setBackupNodes(backup); entry.setPrimary(channel.getLocalMember(false)); } catch (ChannelException x) { @@ -795,7 +798,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb entry.setPrimary(channel.getLocalMember(false)); entry.setBackup(false); entry.setProxy(false); - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); entry.setBackupNodes(backup); if (mapOwner != null) mapOwner.objectMadePrimay(entry.getKey(), entry.getValue()); @@ -830,7 +833,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb return members[node]; } - protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException; + protected abstract Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException; public void heartbeat() { try { @@ -913,7 +916,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb } if (entry.isBackup()) { //select a new backup node - backup = publishEntryInfo(key, entry.getValue()); + backup = publishEntryInfo(key, entry.getValue(), entry.getBackupNodes()); } else if (entry.isProxy()) { //invalidate the previous primary msg = @@ -994,7 +997,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb old = remove(key); try { if (notify) { - Member[] backup = publishEntryInfo(key, value); + Member[] backup = publishEntryInfo(key, value, entry.getBackupNodes()); entry.setBackupNodes(backup); } } catch (ChannelException x) { diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java index 09dcc011ee..a2823bb88c 100644 --- a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java +++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java @@ -75,6 +75,8 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi private ExtensionPointRegistry registry; private ReplicatedMap map; private static List<URL> staticRoutes; + + private String id; private static final Channel createChannel(String address, int port, String bindAddress) { @@ -103,7 +105,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi String domainURI) { this.registry = registry; this.domainURI = domainURI; - + this.id = "[" + System.identityHashCode(this) + "]"; getParameters(domainRegistryURI); } @@ -207,7 +209,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi try { channel.stop(Channel.DEFAULT); } catch (ChannelException e) { - throw new IllegalStateException(e); + logger.log(Level.WARNING, e.getMessage(), e); } map = null; } @@ -336,6 +338,10 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi public void removeListener(EndpointListener listener) { listeners.remove(listener); } + + public void replicate(boolean complete) { + map.replicate(complete); + } public void updateEndpoint(String uri, Endpoint endpoint) { Endpoint oldEndpoint = getEndpoint(uri); @@ -348,7 +354,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi public void entryAdded(Object key, Object value) { MapEntry entry = (MapEntry)value; if (!isLocal(entry)) { - logger.info("Remote endpoint added: " + entry.getValue()); + logger.info(id + " Remote endpoint added: " + entry.getValue()); } Endpoint newEp = (Endpoint)entry.getValue(); for (EndpointListener listener : listeners) { @@ -359,7 +365,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi public void entryRemoved(Object key, Object value) { MapEntry entry = (MapEntry)value; if (!isLocal(entry)) { - logger.info("Remote endpoint removed: " + entry.getValue()); + logger.info(id + " Remote endpoint removed: " + entry.getValue()); } Endpoint oldEp = (Endpoint)entry.getValue(); for (EndpointListener listener : listeners) { @@ -371,7 +377,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi MapEntry oldEntry = (MapEntry)oldValue; MapEntry newEntry = (MapEntry)newValue; if (!isLocal(newEntry)) { - logger.info("Remote endpoint updated: " + newEntry.getValue()); + logger.info(id + " Remote endpoint updated: " + newEntry.getValue()); } Endpoint oldEp = (Endpoint)oldEntry.getValue(); Endpoint newEp = (Endpoint)newEntry.getValue(); diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java index af0bb6fb44..762407604d 100644 --- a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java +++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java @@ -97,7 +97,7 @@ public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, * @return Member - the backup node * @throws ChannelException */ - protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { + protected Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException { if (!(key instanceof Serializable && value instanceof Serializable)) return new Member[0]; //select a backup node @@ -106,12 +106,18 @@ public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, if (backup == null || backup.length == 0) return null; + // Set the receivers to these members that are not in the backup nodes yet + Member[] members = backup; + if (backupNodes != null) { + members = getMapMembersExcl(backupNodes); + } + //publish the data out to all nodes MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value, null, channel.getLocalMember(false), backup); - getChannel().send(getMapMembers(), msg, getChannelSendOptions()); + getChannel().send(members, msg, getChannelSendOptions()); return backup; } diff --git a/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/Test.java b/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java index 8d60608851..a574ba9328 100644 --- a/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/Test.java +++ b/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java @@ -23,42 +23,56 @@ import org.apache.tuscany.sca.assembly.AssemblyFactory; import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
-public class Test {
+public class ReplicatedEndpointRegistryTestCase {
- public static void main(String[] args) throws InterruptedException {
+ @Test
+ @Ignore("Ignore this test case for now as it might be sensitive to the multicast settings for a multi-homed machine")
+ public void testReplicate() throws InterruptedException {
DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry();
FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class);
-
+
ReplicatedEndpointRegistry ep1 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
System.out.println("ep1 is: " + ep1);
ep1.start();
-
+
Endpoint e1 = assemblyFactory.createEndpoint();
e1.setURI("e1uri");
- e1.setExtensionPointRegistry(null);
+ e1.setExtensionPointRegistry(extensionPoints);
ep1.addEndpoint(e1);
-
+
Endpoint e1p = ep1.getEndpoint("e1uri");
- System.out.println(e1p);
-
+ System.out.println("EP1 in Registry 1: " + e1p);
+ Assert.assertNotNull(e1p);
+
ReplicatedEndpointRegistry ep2 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
System.out.println("ep2 is: " + ep2);
ep2.start();
- Thread.sleep(10000);
-
+ Thread.sleep(5000);
+
Endpoint e1p2 = ep2.getEndpoint("e1uri");
- System.out.println(e1p2);
-
-
+ System.out.println("EP1 in Registry 2: " + e1p2);
+ Assert.assertNotNull(e1p2);
+
ReplicatedEndpointRegistry ep3 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
System.out.println("ep3 is: " + ep3);
ep3.start();
- Thread.sleep(10000);
-
+ Thread.sleep(5000);
+
Endpoint e1p3 = ep3.getEndpoint("e1uri");
- System.out.println(e1p3);
-
+ System.out.println("EP1 in Registry 3: " + e1p3);
+ Assert.assertNotNull(e1p3);
+
+ ep1.stop();
+ ep2.stop();
+ ep3.stop();
+ }
+
+ public static void main(String[] args) throws Exception {
+ new ReplicatedEndpointRegistryTestCase().testReplicate();
}
}
|