diff options
author | antelder <antelder@13f79535-47bb-0310-9956-ffa450edef68> | 2010-03-09 20:16:01 +0000 |
---|---|---|
committer | antelder <antelder@13f79535-47bb-0310-9956-ffa450edef68> | 2010-03-09 20:16:01 +0000 |
commit | bfdfdf09e34e5443ac1269324ecf7cfde0409600 (patch) | |
tree | a932ecff63af5a4aad440bce666e0f88333c0400 | |
parent | 97ed9313955ca477af00f44991b6d489b361e2c1 (diff) |
Update Hazelcast endpoint registry to remove endpoints from a runtime that no longer exists
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@921095 13f79535-47bb-0310-9956-ffa450edef68
2 files changed, 95 insertions, 93 deletions
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java index 50a7ea03a4..09cb8b1a01 100644 --- a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java +++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java @@ -21,6 +21,7 @@ package org.apache.tuscany.sca.endpoint.hazelcast; import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.tuscany.sca.runtime.BaseEndpointRegistry; import org.apache.tuscany.sca.runtime.DomainRegistryURI;
import org.apache.tuscany.sca.runtime.EndpointRegistry;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.oasisopen.sca.ServiceRuntimeException;
import com.hazelcast.config.Config;
import com.hazelcast.config.NearCacheConfig;
@@ -42,9 +44,13 @@ import com.hazelcast.core.EntryEvent; import com.hazelcast.core.EntryListener;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
+import com.hazelcast.core.Member;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
+import com.hazelcast.core.MultiMap;
+import com.hazelcast.core.Transaction;
import com.hazelcast.nio.Address;
/**
@@ -56,8 +62,9 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E protected DomainRegistryURI configURI;
private HazelcastInstance hazelcastInstance;
- protected Map<Object, Object> map;
+ protected Map<Object, Object> endpointMap;
private Map<String, Endpoint> localEndpoints = new HashMap<String, Endpoint>();
+ private MultiMap<String, String> endpointOwners;
public HazelcastEndpointRegistry(ExtensionPointRegistry registry,
Map<String, String> attributes,
@@ -68,16 +75,19 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E }
public void start() {
- if (map != null) {
+ if (endpointMap != null) {
throw new IllegalStateException("The registry has already been started");
}
if (configURI.toString().startsWith("tuscany:vm:")) {
- map = new HashMap<Object, Object>();
+ endpointMap = new HashMap<Object, Object>();
} else {
initHazelcastInstance();
IMap imap = hazelcastInstance.getMap(configURI.getDomainName() + "/Endpoints");
imap.addEntryListener(this, true);
- map = imap;
+ endpointMap = imap;
+
+ endpointOwners = hazelcastInstance.getMultiMap(configURI.getDomainName() + "/EndpointOwners");
+
hazelcastInstance.getCluster().addMembershipListener(this);
}
}
@@ -86,7 +96,8 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E if (hazelcastInstance != null) {
hazelcastInstance.shutdown();
hazelcastInstance = null;
- map = null;
+ endpointMap = null;
+ endpointOwners = null;
}
}
@@ -136,14 +147,25 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E }
public void addEndpoint(Endpoint endpoint) {
- map.put(endpoint.getURI(), endpoint);
- localEndpoints.put(endpoint.getURI(), endpoint);
+ String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString();
+ String endpointURI = endpoint.getURI();
+ Transaction txn = hazelcastInstance.getTransaction();
+ txn.begin();
+ try {
+ endpointMap.put(endpointURI, endpoint);
+ endpointOwners.put(localMemberAddr, endpointURI);
+ txn.commit();
+ } catch (Throwable e) {
+ txn.rollback();
+ throw new ServiceRuntimeException(e);
+ }
+ localEndpoints.put(endpointURI, endpoint);
logger.info("Add endpoint - " + endpoint);
}
public List<Endpoint> findEndpoint(String uri) {
List<Endpoint> foundEndpoints = new ArrayList<Endpoint>();
- for (Object v : map.values()) {
+ for (Object v : endpointMap.values()) {
Endpoint endpoint = (Endpoint)v;
logger.fine("Matching against - " + endpoint);
if (matches(uri, endpoint.getURI())) {
@@ -170,16 +192,27 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E }
public Endpoint getEndpoint(String uri) {
- return (Endpoint)map.get(uri);
+ return (Endpoint)endpointMap.get(uri);
}
public List<Endpoint> getEndpoints() {
- return new ArrayList(map.values());
+ return new ArrayList(endpointMap.values());
}
public void removeEndpoint(Endpoint endpoint) {
- map.remove(endpoint.getURI());
- localEndpoints.remove(endpoint.getURI());
+ String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString();
+ String endpointURI = endpoint.getURI();
+ Transaction txn = hazelcastInstance.getTransaction();
+ txn.begin();
+ try {
+ endpointMap.remove(endpointURI);
+ endpointOwners.remove(localMemberAddr, endpointURI);
+ txn.commit();
+ } catch (Throwable e) {
+ txn.rollback();
+ throw new ServiceRuntimeException(e);
+ }
+ localEndpoints.remove(endpointURI);
logger.info("Removed endpoint - " + endpoint);
}
@@ -229,6 +262,29 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E }
public void memberRemoved(MembershipEvent event) {
+ try {
+ String memberAddr = event.getMember().getInetSocketAddress().toString();
+ if (endpointOwners.containsKey(memberAddr)) {
+ ILock lock = hazelcastInstance.getLock("EndpointOwners/" + memberAddr);
+ lock.lock();
+ try {
+ if (endpointOwners.containsKey(memberAddr)) {
+ Collection<String> keys = endpointOwners.remove(memberAddr);
+ for (Object k : keys) {
+ endpointMap.remove(k);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ } catch (Exception e) {
+ if (e.getCause() != null && e.getCause().getCause() != null) {
+ // ignore hazelcast already shutdown exception
+ if (!"Hazelcast Instance is not active!".equals(e.getCause().getCause().getMessage())) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+ }
}
-
}
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java index 32e49dbe41..b7f0201244 100644 --- a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java +++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java @@ -19,10 +19,6 @@ package org.apache.tuscany.sca.endpoint.hazelcast; -import java.net.InetAddress; -import java.util.HashMap; -import java.util.Map; - import org.apache.tuscany.sca.assembly.AssemblyFactory; import org.apache.tuscany.sca.assembly.Binding; import org.apache.tuscany.sca.assembly.Component; @@ -31,18 +27,12 @@ import org.apache.tuscany.sca.assembly.SCABindingFactory; import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.FactoryExtensionPoint; -import org.apache.tuscany.sca.runtime.EndpointListener; import org.apache.tuscany.sca.runtime.RuntimeEndpoint; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; -import com.hazelcast.core.IMap; - -// Ignore so its not run in the build yet till its working -@Ignore("Hazelcast doesn't support the map entry management by members") -public class MultiRegTestCase implements EndpointListener { +public class MultiRegTestCase { private static ExtensionPointRegistry extensionPoints; private static AssemblyFactory assemblyFactory; private static SCABindingFactory scaBindingFactory; @@ -57,97 +47,69 @@ public class MultiRegTestCase implements EndpointListener { @Test public void testReplication() throws Exception { - RuntimeEndpoint ep1 = createEndpoint("ep1uri"); - String host = InetAddress.getLocalHost().getHostAddress(); - String bind = null; // "9.65.158.31"; - String port1 = "8085"; - String port2 = "8086"; - String port3 = "8087"; - String range = "1"; - - Map<String, String> attrs1 = new HashMap<String, String>(); - // attrs1.put("nomcast", "true"); - attrs1.put("bind", bind); - attrs1.put("receiverPort", port1); - attrs1.put("receiverAutoBind", range); - // attrs1.put("routes", host + ":" + port2 + " " + host + ":" + port3); - HazelcastEndpointRegistry reg1 = new HazelcastEndpointRegistry(extensionPoints, attrs1, "tuscany:foo", "bar"); - reg1.addListener(this); + System.out.println("Starting reg1"); + HazelcastEndpointRegistry reg1 = new HazelcastEndpointRegistry(extensionPoints, null, "tuscany:foo?listen=127.0.0.1:9876&multicast=off", "bar"); reg1.start(); - Map<String, String> attrs2 = new HashMap<String, String>(); - // attrs2.put("nomcast", "true"); - attrs1.put("bind", bind); - attrs2.put("receiverPort", port2); - attrs2.put("receiverAutoBind", range); - // attrs2.put("routes", host + ":"+port1); - HazelcastEndpointRegistry reg2 = new HazelcastEndpointRegistry(extensionPoints, attrs2, "tuscany:foo", "bar"); - reg2.addListener(this); + System.out.println("Adding ep1"); + RuntimeEndpoint ep1 = createEndpoint("ep1uri"); + ep1.bind(extensionPoints, reg1); + reg1.addEndpoint(ep1); + + System.out.println("Starting reg3"); + HazelcastEndpointRegistry reg2 = new HazelcastEndpointRegistry(extensionPoints, null, "tuscany:foo?listen=127.0.0.1:9877&multicast=off&remotes=127.0.0.1:9876", "bar"); reg2.start(); - Map<String, String> attrs3 = new HashMap<String, String>(); - // attrs3.put("nomcast", "true"); - attrs1.put("bind", bind); - attrs3.put("receiverPort", port3); - attrs3.put("receiverAutoBind", range); - // attrs3.put("routes", host + ":"+port1); - HazelcastEndpointRegistry reg3 = new HazelcastEndpointRegistry(extensionPoints, attrs3, "tuscany:foo", "bar"); - reg3.addListener(this); + System.out.println("Starting reg2"); + HazelcastEndpointRegistry reg3 = new HazelcastEndpointRegistry(extensionPoints, null, "tuscany:foo?listen=127.0.0.1:9878&multicast=off&remotes=127.0.0.1:9877", "bar"); reg3.start(); - ep1.bind(extensionPoints, reg1); - reg1.addEndpoint(ep1); assertExists(reg1, "ep1uri"); assertExists(reg2, "ep1uri"); assertExists(reg3, "ep1uri"); + System.out.println("Adding ep2"); RuntimeEndpoint ep2 = createEndpoint("ep2uri"); ep2.bind(extensionPoints, reg2); reg2.addEndpoint(ep2); + assertExists(reg2, "ep2uri"); assertExists(reg1, "ep2uri"); assertExists(reg3, "ep2uri"); - System.out.println(((IMap)reg1.map).localKeySet().size()); - System.out.println(((IMap)reg2.map).localKeySet().size()); - System.out.println(((IMap)reg3.map).localKeySet().size()); - + System.out.println("Stopping reg1"); reg1.stop(); - Thread.sleep(6000); + System.out.println("Stopped reg1"); + Thread.sleep(500); + Assert.assertNull(reg2.getEndpoint("ep1uri")); Assert.assertNull(reg3.getEndpoint("ep1uri")); - System.out.println(((IMap)reg2.map).localKeySet().size()); - System.out.println(((IMap)reg3.map).localKeySet().size()); - assertExists(reg2, "ep2uri"); assertExists(reg3, "ep2uri"); + System.out.println("Starting reg1"); reg1.start(); ep1.bind(extensionPoints, reg1); + + System.out.println("adding ep1"); reg1.addEndpoint(ep1); assertExists(reg1, "ep1uri"); assertExists(reg2, "ep1uri"); assertExists(reg3, "ep1uri"); + System.out.println("Stopping reg1"); reg1.stop(); + System.out.println("Stopping reg2"); reg2.stop(); + System.out.println("Stopping reg3"); reg3.stop(); - System.out.println(); // closed + System.out.println("done"); } private Endpoint assertExists(HazelcastEndpointRegistry reg, String uri) throws InterruptedException { - Endpoint ep = null; - int count = 0; - while (ep == null && count < 15) { - ep = reg.getEndpoint(uri); - if (ep == null) { - Thread.sleep(1000); - System.out.println(reg + ": tries=" + count); - } - count++; - } + Endpoint ep = reg.getEndpoint(uri); Assert.assertNotNull(ep); Assert.assertEquals(uri, ep.getURI()); return ep; @@ -164,20 +126,4 @@ public class MultiRegTestCase implements EndpointListener { return ep; } - private void print(String prefix, Endpoint ep) { - System.out.println(prefix + ": "+ep); - } - - public void endpointAdded(Endpoint endpoint) { - print("Added", endpoint); - } - - public void endpointRemoved(Endpoint endpoint) { - print("Removed", endpoint); - } - - public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) { - print("Updated", newEndpoint); - } - } |