From d3937102e3ee45a413aaf1f3aa19020d23d8b721 Mon Sep 17 00:00:00 2001 From: rfeng Date: Mon, 12 Oct 2009 19:31:48 +0000 Subject: Make sure that entries are replicated to all nodes (not just at least a backup node) git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@824468 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/endpoint/tribes/AbstractReplicatedMap.java | 21 +++--- .../tribes/ReplicatedEndpointRegistry.java | 16 +++-- .../tuscany/sca/endpoint/tribes/ReplicatedMap.java | 10 ++- .../tribes/ReplicatedEndpointRegistryTestCase.java | 78 ++++++++++++++++++++++ .../apache/tuscany/sca/endpoint/tribes/Test.java | 64 ------------------ 5 files changed, 109 insertions(+), 80 deletions(-) create mode 100644 java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java delete mode 100644 java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/Test.java (limited to 'java') diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java index 91aee4d585..98c6739854 100644 --- a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java +++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java @@ -685,7 +685,8 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb } public void mapMemberAdded(Member member) { - if (member.equals(getChannel().getLocalMember(false))) + Member self = getChannel().getLocalMember(false); + if (member.equals(self)) return; boolean memberAdded = false; //select a backup node if we don't have one @@ -703,11 +704,13 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb MapEntry entry = (MapEntry)super.get(e.getKey()); if (entry == null) continue; - if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { + // if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { + // [rfeng] Change the behavior to replicate to all nodes + if (entry.isPrimary() && self.equals(entry.getPrimary())) { try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); entry.setBackupNodes(backup); - entry.setPrimary(channel.getLocalMember(false)); + entry.setPrimary(self); } catch (ChannelException x) { log.error("Unable to select backup node.", x); } //catch @@ -765,7 +768,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb if (log.isDebugEnabled()) log.debug("[1] Primary choosing a new backup"); try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); entry.setBackupNodes(backup); entry.setPrimary(channel.getLocalMember(false)); } catch (ChannelException x) { @@ -795,7 +798,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb entry.setPrimary(channel.getLocalMember(false)); entry.setBackup(false); entry.setProxy(false); - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); entry.setBackupNodes(backup); if (mapOwner != null) mapOwner.objectMadePrimay(entry.getKey(), entry.getValue()); @@ -830,7 +833,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb return members[node]; } - protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException; + protected abstract Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException; public void heartbeat() { try { @@ -913,7 +916,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb } if (entry.isBackup()) { //select a new backup node - backup = publishEntryInfo(key, entry.getValue()); + backup = publishEntryInfo(key, entry.getValue(), entry.getBackupNodes()); } else if (entry.isProxy()) { //invalidate the previous primary msg = @@ -994,7 +997,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb old = remove(key); try { if (notify) { - Member[] backup = publishEntryInfo(key, value); + Member[] backup = publishEntryInfo(key, value, entry.getBackupNodes()); entry.setBackupNodes(backup); } } catch (ChannelException x) { diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java index 09dcc011ee..a2823bb88c 100644 --- a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java +++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java @@ -75,6 +75,8 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi private ExtensionPointRegistry registry; private ReplicatedMap map; private static List staticRoutes; + + private String id; private static final Channel createChannel(String address, int port, String bindAddress) { @@ -103,7 +105,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi String domainURI) { this.registry = registry; this.domainURI = domainURI; - + this.id = "[" + System.identityHashCode(this) + "]"; getParameters(domainRegistryURI); } @@ -207,7 +209,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi try { channel.stop(Channel.DEFAULT); } catch (ChannelException e) { - throw new IllegalStateException(e); + logger.log(Level.WARNING, e.getMessage(), e); } map = null; } @@ -336,6 +338,10 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi public void removeListener(EndpointListener listener) { listeners.remove(listener); } + + public void replicate(boolean complete) { + map.replicate(complete); + } public void updateEndpoint(String uri, Endpoint endpoint) { Endpoint oldEndpoint = getEndpoint(uri); @@ -348,7 +354,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi public void entryAdded(Object key, Object value) { MapEntry entry = (MapEntry)value; if (!isLocal(entry)) { - logger.info("Remote endpoint added: " + entry.getValue()); + logger.info(id + " Remote endpoint added: " + entry.getValue()); } Endpoint newEp = (Endpoint)entry.getValue(); for (EndpointListener listener : listeners) { @@ -359,7 +365,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi public void entryRemoved(Object key, Object value) { MapEntry entry = (MapEntry)value; if (!isLocal(entry)) { - logger.info("Remote endpoint removed: " + entry.getValue()); + logger.info(id + " Remote endpoint removed: " + entry.getValue()); } Endpoint oldEp = (Endpoint)entry.getValue(); for (EndpointListener listener : listeners) { @@ -371,7 +377,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi MapEntry oldEntry = (MapEntry)oldValue; MapEntry newEntry = (MapEntry)newValue; if (!isLocal(newEntry)) { - logger.info("Remote endpoint updated: " + newEntry.getValue()); + logger.info(id + " Remote endpoint updated: " + newEntry.getValue()); } Endpoint oldEp = (Endpoint)oldEntry.getValue(); Endpoint newEp = (Endpoint)newEntry.getValue(); diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java index af0bb6fb44..762407604d 100644 --- a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java +++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java @@ -97,7 +97,7 @@ public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, * @return Member - the backup node * @throws ChannelException */ - protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { + protected Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException { if (!(key instanceof Serializable && value instanceof Serializable)) return new Member[0]; //select a backup node @@ -106,12 +106,18 @@ public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, if (backup == null || backup.length == 0) return null; + // Set the receivers to these members that are not in the backup nodes yet + Member[] members = backup; + if (backupNodes != null) { + members = getMapMembersExcl(backupNodes); + } + //publish the data out to all nodes MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value, null, channel.getLocalMember(false), backup); - getChannel().send(getMapMembers(), msg, getChannelSendOptions()); + getChannel().send(members, msg, getChannelSendOptions()); return backup; } diff --git a/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java b/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java new file mode 100644 index 0000000000..a574ba9328 --- /dev/null +++ b/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.tribes; + +import org.apache.tuscany.sca.assembly.AssemblyFactory; +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +public class ReplicatedEndpointRegistryTestCase { + + @Test + @Ignore("Ignore this test case for now as it might be sensitive to the multicast settings for a multi-homed machine") + public void testReplicate() throws InterruptedException { + DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry(); + FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class); + AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class); + + ReplicatedEndpointRegistry ep1 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar"); + System.out.println("ep1 is: " + ep1); + ep1.start(); + + Endpoint e1 = assemblyFactory.createEndpoint(); + e1.setURI("e1uri"); + e1.setExtensionPointRegistry(extensionPoints); + ep1.addEndpoint(e1); + + Endpoint e1p = ep1.getEndpoint("e1uri"); + System.out.println("EP1 in Registry 1: " + e1p); + Assert.assertNotNull(e1p); + + ReplicatedEndpointRegistry ep2 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar"); + System.out.println("ep2 is: " + ep2); + ep2.start(); + Thread.sleep(5000); + + Endpoint e1p2 = ep2.getEndpoint("e1uri"); + System.out.println("EP1 in Registry 2: " + e1p2); + Assert.assertNotNull(e1p2); + + ReplicatedEndpointRegistry ep3 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar"); + System.out.println("ep3 is: " + ep3); + ep3.start(); + Thread.sleep(5000); + + Endpoint e1p3 = ep3.getEndpoint("e1uri"); + System.out.println("EP1 in Registry 3: " + e1p3); + Assert.assertNotNull(e1p3); + + ep1.stop(); + ep2.stop(); + ep3.stop(); + } + + public static void main(String[] args) throws Exception { + new ReplicatedEndpointRegistryTestCase().testReplicate(); + } +} diff --git a/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/Test.java b/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/Test.java deleted file mode 100644 index 8d60608851..0000000000 --- a/java/sca/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/Test.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.tribes; - -import org.apache.tuscany.sca.assembly.AssemblyFactory; -import org.apache.tuscany.sca.assembly.Endpoint; -import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry; -import org.apache.tuscany.sca.core.FactoryExtensionPoint; - -public class Test { - - public static void main(String[] args) throws InterruptedException { - DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry(); - FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class); - AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class); - - ReplicatedEndpointRegistry ep1 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar"); - System.out.println("ep1 is: " + ep1); - ep1.start(); - - Endpoint e1 = assemblyFactory.createEndpoint(); - e1.setURI("e1uri"); - e1.setExtensionPointRegistry(null); - ep1.addEndpoint(e1); - - Endpoint e1p = ep1.getEndpoint("e1uri"); - System.out.println(e1p); - - ReplicatedEndpointRegistry ep2 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar"); - System.out.println("ep2 is: " + ep2); - ep2.start(); - Thread.sleep(10000); - - Endpoint e1p2 = ep2.getEndpoint("e1uri"); - System.out.println(e1p2); - - - ReplicatedEndpointRegistry ep3 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar"); - System.out.println("ep3 is: " + ep3); - ep3.start(); - Thread.sleep(10000); - - Endpoint e1p3 = ep3.getEndpoint("e1uri"); - System.out.println(e1p3); - - } -} -- cgit v1.2.3