summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache
diff options
context:
space:
mode:
authorantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2010-03-09 20:16:01 +0000
committerantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2010-03-09 20:16:01 +0000
commitbfdfdf09e34e5443ac1269324ecf7cfde0409600 (patch)
treea932ecff63af5a4aad440bce666e0f88333c0400 /sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache
parent97ed9313955ca477af00f44991b6d489b361e2c1 (diff)
Update Hazelcast endpoint registry to remove endpoints from a runtime that no longer exists
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@921095 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache')
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java82
1 files changed, 69 insertions, 13 deletions
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
index 50a7ea03a4..09cb8b1a01 100644
--- a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
@@ -21,6 +21,7 @@ package org.apache.tuscany.sca.endpoint.hazelcast;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.tuscany.sca.runtime.BaseEndpointRegistry;
import org.apache.tuscany.sca.runtime.DomainRegistryURI;
import org.apache.tuscany.sca.runtime.EndpointRegistry;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.oasisopen.sca.ServiceRuntimeException;
import com.hazelcast.config.Config;
import com.hazelcast.config.NearCacheConfig;
@@ -42,9 +44,13 @@ import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
+import com.hazelcast.core.Member;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
+import com.hazelcast.core.MultiMap;
+import com.hazelcast.core.Transaction;
import com.hazelcast.nio.Address;
/**
@@ -56,8 +62,9 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E
protected DomainRegistryURI configURI;
private HazelcastInstance hazelcastInstance;
- protected Map<Object, Object> map;
+ protected Map<Object, Object> endpointMap;
private Map<String, Endpoint> localEndpoints = new HashMap<String, Endpoint>();
+ private MultiMap<String, String> endpointOwners;
public HazelcastEndpointRegistry(ExtensionPointRegistry registry,
Map<String, String> attributes,
@@ -68,16 +75,19 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E
}
public void start() {
- if (map != null) {
+ if (endpointMap != null) {
throw new IllegalStateException("The registry has already been started");
}
if (configURI.toString().startsWith("tuscany:vm:")) {
- map = new HashMap<Object, Object>();
+ endpointMap = new HashMap<Object, Object>();
} else {
initHazelcastInstance();
IMap imap = hazelcastInstance.getMap(configURI.getDomainName() + "/Endpoints");
imap.addEntryListener(this, true);
- map = imap;
+ endpointMap = imap;
+
+ endpointOwners = hazelcastInstance.getMultiMap(configURI.getDomainName() + "/EndpointOwners");
+
hazelcastInstance.getCluster().addMembershipListener(this);
}
}
@@ -86,7 +96,8 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E
if (hazelcastInstance != null) {
hazelcastInstance.shutdown();
hazelcastInstance = null;
- map = null;
+ endpointMap = null;
+ endpointOwners = null;
}
}
@@ -136,14 +147,25 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E
}
public void addEndpoint(Endpoint endpoint) {
- map.put(endpoint.getURI(), endpoint);
- localEndpoints.put(endpoint.getURI(), endpoint);
+ String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString();
+ String endpointURI = endpoint.getURI();
+ Transaction txn = hazelcastInstance.getTransaction();
+ txn.begin();
+ try {
+ endpointMap.put(endpointURI, endpoint);
+ endpointOwners.put(localMemberAddr, endpointURI);
+ txn.commit();
+ } catch (Throwable e) {
+ txn.rollback();
+ throw new ServiceRuntimeException(e);
+ }
+ localEndpoints.put(endpointURI, endpoint);
logger.info("Add endpoint - " + endpoint);
}
public List<Endpoint> findEndpoint(String uri) {
List<Endpoint> foundEndpoints = new ArrayList<Endpoint>();
- for (Object v : map.values()) {
+ for (Object v : endpointMap.values()) {
Endpoint endpoint = (Endpoint)v;
logger.fine("Matching against - " + endpoint);
if (matches(uri, endpoint.getURI())) {
@@ -170,16 +192,27 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E
}
public Endpoint getEndpoint(String uri) {
- return (Endpoint)map.get(uri);
+ return (Endpoint)endpointMap.get(uri);
}
public List<Endpoint> getEndpoints() {
- return new ArrayList(map.values());
+ return new ArrayList(endpointMap.values());
}
public void removeEndpoint(Endpoint endpoint) {
- map.remove(endpoint.getURI());
- localEndpoints.remove(endpoint.getURI());
+ String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString();
+ String endpointURI = endpoint.getURI();
+ Transaction txn = hazelcastInstance.getTransaction();
+ txn.begin();
+ try {
+ endpointMap.remove(endpointURI);
+ endpointOwners.remove(localMemberAddr, endpointURI);
+ txn.commit();
+ } catch (Throwable e) {
+ txn.rollback();
+ throw new ServiceRuntimeException(e);
+ }
+ localEndpoints.remove(endpointURI);
logger.info("Removed endpoint - " + endpoint);
}
@@ -229,6 +262,29 @@ public class HazelcastEndpointRegistry extends BaseEndpointRegistry implements E
}
public void memberRemoved(MembershipEvent event) {
+ try {
+ String memberAddr = event.getMember().getInetSocketAddress().toString();
+ if (endpointOwners.containsKey(memberAddr)) {
+ ILock lock = hazelcastInstance.getLock("EndpointOwners/" + memberAddr);
+ lock.lock();
+ try {
+ if (endpointOwners.containsKey(memberAddr)) {
+ Collection<String> keys = endpointOwners.remove(memberAddr);
+ for (Object k : keys) {
+ endpointMap.remove(k);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ } catch (Exception e) {
+ if (e.getCause() != null && e.getCause().getCause() != null) {
+ // ignore hazelcast already shutdown exception
+ if (!"Hazelcast Instance is not active!".equals(e.getCause().getCause().getMessage())) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+ }
}
-
}