summaryrefslogtreecommitdiffstats
path: root/java/sca/modules/endpoint-tribes
diff options
context:
space:
mode:
authorrfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68>2009-10-12 19:31:48 +0000
committerrfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68>2009-10-12 19:31:48 +0000
commitd3937102e3ee45a413aaf1f3aa19020d23d8b721 (patch)
tree80dda4f00db4110db7747147d7aadb6e0ce8e740 /java/sca/modules/endpoint-tribes
parentf1677566975979192ad0aafaba06f2d1fd1307bf (diff)
Make sure that entries are replicated to all nodes (not just at least a backup node)
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@824468 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/sca/modules/endpoint-tribes')
-rw-r--r--java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java21
-rw-r--r--java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java16
-rw-r--r--java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java10
-rw-r--r--java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java (renamed from java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/Test.java)48
4 files changed, 62 insertions, 33 deletions
diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
index 91aee4d585..98c6739854 100644
--- a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
+++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
@@ -685,7 +685,8 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
}
public void mapMemberAdded(Member member) {
- if (member.equals(getChannel().getLocalMember(false)))
+ Member self = getChannel().getLocalMember(false);
+ if (member.equals(self))
return;
boolean memberAdded = false;
//select a backup node if we don't have one
@@ -703,11 +704,13 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
MapEntry entry = (MapEntry)super.get(e.getKey());
if (entry == null)
continue;
- if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
+ // if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
+ // [rfeng] Change the behavior to replicate to all nodes
+ if (entry.isPrimary() && self.equals(entry.getPrimary())) {
try {
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes());
entry.setBackupNodes(backup);
- entry.setPrimary(channel.getLocalMember(false));
+ entry.setPrimary(self);
} catch (ChannelException x) {
log.error("Unable to select backup node.", x);
} //catch
@@ -765,7 +768,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
if (log.isDebugEnabled())
log.debug("[1] Primary choosing a new backup");
try {
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes());
entry.setBackupNodes(backup);
entry.setPrimary(channel.getLocalMember(false));
} catch (ChannelException x) {
@@ -795,7 +798,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
entry.setPrimary(channel.getLocalMember(false));
entry.setBackup(false);
entry.setProxy(false);
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes());
entry.setBackupNodes(backup);
if (mapOwner != null)
mapOwner.objectMadePrimay(entry.getKey(), entry.getValue());
@@ -830,7 +833,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
return members[node];
}
- protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
+ protected abstract Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException;
public void heartbeat() {
try {
@@ -913,7 +916,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
}
if (entry.isBackup()) {
//select a new backup node
- backup = publishEntryInfo(key, entry.getValue());
+ backup = publishEntryInfo(key, entry.getValue(), entry.getBackupNodes());
} else if (entry.isProxy()) {
//invalidate the previous primary
msg =
@@ -994,7 +997,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
old = remove(key);
try {
if (notify) {
- Member[] backup = publishEntryInfo(key, value);
+ Member[] backup = publishEntryInfo(key, value, entry.getBackupNodes());
entry.setBackupNodes(backup);
}
} catch (ChannelException x) {
diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
index 09dcc011ee..a2823bb88c 100644
--- a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
+++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
@@ -75,6 +75,8 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
private ExtensionPointRegistry registry;
private ReplicatedMap map;
private static List<URL> staticRoutes;
+
+ private String id;
private static final Channel createChannel(String address, int port, String bindAddress) {
@@ -103,7 +105,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
String domainURI) {
this.registry = registry;
this.domainURI = domainURI;
-
+ this.id = "[" + System.identityHashCode(this) + "]";
getParameters(domainRegistryURI);
}
@@ -207,7 +209,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
try {
channel.stop(Channel.DEFAULT);
} catch (ChannelException e) {
- throw new IllegalStateException(e);
+ logger.log(Level.WARNING, e.getMessage(), e);
}
map = null;
}
@@ -336,6 +338,10 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
public void removeListener(EndpointListener listener) {
listeners.remove(listener);
}
+
+ public void replicate(boolean complete) {
+ map.replicate(complete);
+ }
public void updateEndpoint(String uri, Endpoint endpoint) {
Endpoint oldEndpoint = getEndpoint(uri);
@@ -348,7 +354,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
public void entryAdded(Object key, Object value) {
MapEntry entry = (MapEntry)value;
if (!isLocal(entry)) {
- logger.info("Remote endpoint added: " + entry.getValue());
+ logger.info(id + " Remote endpoint added: " + entry.getValue());
}
Endpoint newEp = (Endpoint)entry.getValue();
for (EndpointListener listener : listeners) {
@@ -359,7 +365,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
public void entryRemoved(Object key, Object value) {
MapEntry entry = (MapEntry)value;
if (!isLocal(entry)) {
- logger.info("Remote endpoint removed: " + entry.getValue());
+ logger.info(id + " Remote endpoint removed: " + entry.getValue());
}
Endpoint oldEp = (Endpoint)entry.getValue();
for (EndpointListener listener : listeners) {
@@ -371,7 +377,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
MapEntry oldEntry = (MapEntry)oldValue;
MapEntry newEntry = (MapEntry)newValue;
if (!isLocal(newEntry)) {
- logger.info("Remote endpoint updated: " + newEntry.getValue());
+ logger.info(id + " Remote endpoint updated: " + newEntry.getValue());
}
Endpoint oldEp = (Endpoint)oldEntry.getValue();
Endpoint newEp = (Endpoint)newEntry.getValue();
diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
index af0bb6fb44..762407604d 100644
--- a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
+++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
@@ -97,7 +97,7 @@ public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback,
* @return Member - the backup node
* @throws ChannelException
*/
- protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
+ protected Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException {
if (!(key instanceof Serializable && value instanceof Serializable))
return new Member[0];
//select a backup node
@@ -106,12 +106,18 @@ public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback,
if (backup == null || backup.length == 0)
return null;
+ // Set the receivers to these members that are not in the backup nodes yet
+ Member[] members = backup;
+ if (backupNodes != null) {
+ members = getMapMembersExcl(backupNodes);
+ }
+
//publish the data out to all nodes
MapMessage msg =
new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value,
null, channel.getLocalMember(false), backup);
- getChannel().send(getMapMembers(), msg, getChannelSendOptions());
+ getChannel().send(members, msg, getChannelSendOptions());
return backup;
}
diff --git a/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/Test.java b/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java
index 8d60608851..a574ba9328 100644
--- a/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/Test.java
+++ b/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java
@@ -23,42 +23,56 @@ import org.apache.tuscany.sca.assembly.AssemblyFactory;
import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
-public class Test {
+public class ReplicatedEndpointRegistryTestCase {
- public static void main(String[] args) throws InterruptedException {
+ @Test
+ @Ignore("Ignore this test case for now as it might be sensitive to the multicast settings for a multi-homed machine")
+ public void testReplicate() throws InterruptedException {
DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry();
FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class);
-
+
ReplicatedEndpointRegistry ep1 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
System.out.println("ep1 is: " + ep1);
ep1.start();
-
+
Endpoint e1 = assemblyFactory.createEndpoint();
e1.setURI("e1uri");
- e1.setExtensionPointRegistry(null);
+ e1.setExtensionPointRegistry(extensionPoints);
ep1.addEndpoint(e1);
-
+
Endpoint e1p = ep1.getEndpoint("e1uri");
- System.out.println(e1p);
-
+ System.out.println("EP1 in Registry 1: " + e1p);
+ Assert.assertNotNull(e1p);
+
ReplicatedEndpointRegistry ep2 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
System.out.println("ep2 is: " + ep2);
ep2.start();
- Thread.sleep(10000);
-
+ Thread.sleep(5000);
+
Endpoint e1p2 = ep2.getEndpoint("e1uri");
- System.out.println(e1p2);
-
-
+ System.out.println("EP1 in Registry 2: " + e1p2);
+ Assert.assertNotNull(e1p2);
+
ReplicatedEndpointRegistry ep3 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
System.out.println("ep3 is: " + ep3);
ep3.start();
- Thread.sleep(10000);
-
+ Thread.sleep(5000);
+
Endpoint e1p3 = ep3.getEndpoint("e1uri");
- System.out.println(e1p3);
-
+ System.out.println("EP1 in Registry 3: " + e1p3);
+ Assert.assertNotNull(e1p3);
+
+ ep1.stop();
+ ep2.stop();
+ ep3.stop();
+ }
+
+ public static void main(String[] args) throws Exception {
+ new ReplicatedEndpointRegistryTestCase().testReplicate();
}
}