summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java82
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java106
2 files changed, 95 insertions, 93 deletions
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
index 50a7ea03a4..09cb8b1a01 100644
--- a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
@@ -21,6 +21,7 @@ package org.apache.tuscany.sca.endpoint.hazelcast;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.tuscany.sca.runtime.BaseEndpointRegistry;
import org.apache.tuscany.sca.runtime.DomainRegistryURI;
import org.apache.tuscany.sca.runtime.EndpointRegistry;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.oasisopen.sca.ServiceRuntimeException;
import com.hazelcast.config.Config;
import com.hazelcast.config.NearCacheConfig;
@@ -42,9 +44,13 @@ import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
+import com.hazelcast.core.Member;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
+import com.hazelcast.core.MultiMap;
+import com.hazelcast.core.Transaction;
import com.hazelcast.nio.Address;
/**
@@ -56,8 +62,9 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E
protected DomainRegistryURI configURI;
private HazelcastInstance hazelcastInstance;
- protected Map<Object, Object> map;
+ protected Map<Object, Object> endpointMap;
private Map<String, Endpoint> localEndpoints = new HashMap<String, Endpoint>();
+ private MultiMap<String, String> endpointOwners;
public HazelcastEndpointRegistry(ExtensionPointRegistry registry,
Map<String, String> attributes,
@@ -68,16 +75,19 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E
}
public void start() {
- if (map != null) {
+ if (endpointMap != null) {
throw new IllegalStateException("The registry has already been started");
}
if (configURI.toString().startsWith("tuscany:vm:")) {
- map = new HashMap<Object, Object>();
+ endpointMap = new HashMap<Object, Object>();
} else {
initHazelcastInstance();
IMap imap = hazelcastInstance.getMap(configURI.getDomainName() + "/Endpoints");
imap.addEntryListener(this, true);
- map = imap;
+ endpointMap = imap;
+
+ endpointOwners = hazelcastInstance.getMultiMap(configURI.getDomainName() + "/EndpointOwners");
+
hazelcastInstance.getCluster().addMembershipListener(this);
}
}
@@ -86,7 +96,8 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E
if (hazelcastInstance != null) {
hazelcastInstance.shutdown();
hazelcastInstance = null;
- map = null;
+ endpointMap = null;
+ endpointOwners = null;
}
}
@@ -136,14 +147,25 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E
}
public void addEndpoint(Endpoint endpoint) {
- map.put(endpoint.getURI(), endpoint);
- localEndpoints.put(endpoint.getURI(), endpoint);
+ String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString();
+ String endpointURI = endpoint.getURI();
+ Transaction txn = hazelcastInstance.getTransaction();
+ txn.begin();
+ try {
+ endpointMap.put(endpointURI, endpoint);
+ endpointOwners.put(localMemberAddr, endpointURI);
+ txn.commit();
+ } catch (Throwable e) {
+ txn.rollback();
+ throw new ServiceRuntimeException(e);
+ }
+ localEndpoints.put(endpointURI, endpoint);
logger.info("Add endpoint - " + endpoint);
}
public List<Endpoint> findEndpoint(String uri) {
List<Endpoint> foundEndpoints = new ArrayList<Endpoint>();
- for (Object v : map.values()) {
+ for (Object v : endpointMap.values()) {
Endpoint endpoint = (Endpoint)v;
logger.fine("Matching against - " + endpoint);
if (matches(uri, endpoint.getURI())) {
@@ -170,16 +192,27 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E
}
public Endpoint getEndpoint(String uri) {
- return (Endpoint)map.get(uri);
+ return (Endpoint)endpointMap.get(uri);
}
public List<Endpoint> getEndpoints() {
- return new ArrayList(map.values());
+ return new ArrayList(endpointMap.values());
}
public void removeEndpoint(Endpoint endpoint) {
- map.remove(endpoint.getURI());
- localEndpoints.remove(endpoint.getURI());
+ String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString();
+ String endpointURI = endpoint.getURI();
+ Transaction txn = hazelcastInstance.getTransaction();
+ txn.begin();
+ try {
+ endpointMap.remove(endpointURI);
+ endpointOwners.remove(localMemberAddr, endpointURI);
+ txn.commit();
+ } catch (Throwable e) {
+ txn.rollback();
+ throw new ServiceRuntimeException(e);
+ }
+ localEndpoints.remove(endpointURI);
logger.info("Removed endpoint - " + endpoint);
}
@@ -229,6 +262,29 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E
}
public void memberRemoved(MembershipEvent event) {
+ try {
+ String memberAddr = event.getMember().getInetSocketAddress().toString();
+ if (endpointOwners.containsKey(memberAddr)) {
+ ILock lock = hazelcastInstance.getLock("EndpointOwners/" + memberAddr);
+ lock.lock();
+ try {
+ if (endpointOwners.containsKey(memberAddr)) {
+ Collection<String> keys = endpointOwners.remove(memberAddr);
+ for (Object k : keys) {
+ endpointMap.remove(k);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ } catch (Exception e) {
+ if (e.getCause() != null && e.getCause().getCause() != null) {
+ // ignore hazelcast already shutdown exception
+ if (!"Hazelcast Instance is not active!".equals(e.getCause().getCause().getMessage())) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+ }
}
-
}
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java
index 32e49dbe41..b7f0201244 100644
--- a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java
@@ -19,10 +19,6 @@
package org.apache.tuscany.sca.endpoint.hazelcast;
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.tuscany.sca.assembly.AssemblyFactory;
import org.apache.tuscany.sca.assembly.Binding;
import org.apache.tuscany.sca.assembly.Component;
@@ -31,18 +27,12 @@ import org.apache.tuscany.sca.assembly.SCABindingFactory;
import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
-import org.apache.tuscany.sca.runtime.EndpointListener;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
-import com.hazelcast.core.IMap;
-
-// Ignore so its not run in the build yet till its working
-@Ignore("Hazelcast doesn't support the map entry management by members")
-public class MultiRegTestCase implements EndpointListener {
+public class MultiRegTestCase {
private static ExtensionPointRegistry extensionPoints;
private static AssemblyFactory assemblyFactory;
private static SCABindingFactory scaBindingFactory;
@@ -57,97 +47,69 @@ public class MultiRegTestCase implements EndpointListener {
@Test
public void testReplication() throws Exception {
- RuntimeEndpoint ep1 = createEndpoint("ep1uri");
- String host = InetAddress.getLocalHost().getHostAddress();
- String bind = null; // "9.65.158.31";
- String port1 = "8085";
- String port2 = "8086";
- String port3 = "8087";
- String range = "1";
-
- Map<String, String> attrs1 = new HashMap<String, String>();
- // attrs1.put("nomcast", "true");
- attrs1.put("bind", bind);
- attrs1.put("receiverPort", port1);
- attrs1.put("receiverAutoBind", range);
- // attrs1.put("routes", host + ":" + port2 + " " + host + ":" + port3);
- HazelcastEndpointRegistry reg1 = new HazelcastEndpointRegistry(extensionPoints, attrs1, "tuscany:foo", "bar");
- reg1.addListener(this);
+ System.out.println("Starting reg1");
+ HazelcastEndpointRegistry reg1 = new HazelcastEndpointRegistry(extensionPoints, null, "tuscany:foo?listen=127.0.0.1:9876&multicast=off", "bar");
reg1.start();
- Map<String, String> attrs2 = new HashMap<String, String>();
- // attrs2.put("nomcast", "true");
- attrs1.put("bind", bind);
- attrs2.put("receiverPort", port2);
- attrs2.put("receiverAutoBind", range);
- // attrs2.put("routes", host + ":"+port1);
- HazelcastEndpointRegistry reg2 = new HazelcastEndpointRegistry(extensionPoints, attrs2, "tuscany:foo", "bar");
- reg2.addListener(this);
+ System.out.println("Adding ep1");
+ RuntimeEndpoint ep1 = createEndpoint("ep1uri");
+ ep1.bind(extensionPoints, reg1);
+ reg1.addEndpoint(ep1);
+
+ System.out.println("Starting reg3");
+ HazelcastEndpointRegistry reg2 = new HazelcastEndpointRegistry(extensionPoints, null, "tuscany:foo?listen=127.0.0.1:9877&multicast=off&remotes=127.0.0.1:9876", "bar");
reg2.start();
- Map<String, String> attrs3 = new HashMap<String, String>();
- // attrs3.put("nomcast", "true");
- attrs1.put("bind", bind);
- attrs3.put("receiverPort", port3);
- attrs3.put("receiverAutoBind", range);
- // attrs3.put("routes", host + ":"+port1);
- HazelcastEndpointRegistry reg3 = new HazelcastEndpointRegistry(extensionPoints, attrs3, "tuscany:foo", "bar");
- reg3.addListener(this);
+ System.out.println("Starting reg2");
+ HazelcastEndpointRegistry reg3 = new HazelcastEndpointRegistry(extensionPoints, null, "tuscany:foo?listen=127.0.0.1:9878&multicast=off&remotes=127.0.0.1:9877", "bar");
reg3.start();
- ep1.bind(extensionPoints, reg1);
- reg1.addEndpoint(ep1);
assertExists(reg1, "ep1uri");
assertExists(reg2, "ep1uri");
assertExists(reg3, "ep1uri");
+ System.out.println("Adding ep2");
RuntimeEndpoint ep2 = createEndpoint("ep2uri");
ep2.bind(extensionPoints, reg2);
reg2.addEndpoint(ep2);
+
assertExists(reg2, "ep2uri");
assertExists(reg1, "ep2uri");
assertExists(reg3, "ep2uri");
- System.out.println(((IMap)reg1.map).localKeySet().size());
- System.out.println(((IMap)reg2.map).localKeySet().size());
- System.out.println(((IMap)reg3.map).localKeySet().size());
-
+ System.out.println("Stopping reg1");
reg1.stop();
- Thread.sleep(6000);
+ System.out.println("Stopped reg1");
+ Thread.sleep(500);
+
Assert.assertNull(reg2.getEndpoint("ep1uri"));
Assert.assertNull(reg3.getEndpoint("ep1uri"));
- System.out.println(((IMap)reg2.map).localKeySet().size());
- System.out.println(((IMap)reg3.map).localKeySet().size());
-
assertExists(reg2, "ep2uri");
assertExists(reg3, "ep2uri");
+ System.out.println("Starting reg1");
reg1.start();
ep1.bind(extensionPoints, reg1);
+
+ System.out.println("adding ep1");
reg1.addEndpoint(ep1);
assertExists(reg1, "ep1uri");
assertExists(reg2, "ep1uri");
assertExists(reg3, "ep1uri");
+ System.out.println("Stopping reg1");
reg1.stop();
+ System.out.println("Stopping reg2");
reg2.stop();
+ System.out.println("Stopping reg3");
reg3.stop();
- System.out.println(); // closed
+ System.out.println("done");
}
private Endpoint assertExists(HazelcastEndpointRegistry reg, String uri) throws InterruptedException {
- Endpoint ep = null;
- int count = 0;
- while (ep == null && count < 15) {
- ep = reg.getEndpoint(uri);
- if (ep == null) {
- Thread.sleep(1000);
- System.out.println(reg + ": tries=" + count);
- }
- count++;
- }
+ Endpoint ep = reg.getEndpoint(uri);
Assert.assertNotNull(ep);
Assert.assertEquals(uri, ep.getURI());
return ep;
@@ -164,20 +126,4 @@ public class MultiRegTestCase implements EndpointListener {
return ep;
}
- private void print(String prefix, Endpoint ep) {
- System.out.println(prefix + ": "+ep);
- }
-
- public void endpointAdded(Endpoint endpoint) {
- print("Added", endpoint);
- }
-
- public void endpointRemoved(Endpoint endpoint) {
- print("Removed", endpoint);
- }
-
- public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) {
- print("Updated", newEndpoint);
- }
-
}