summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/endpoint-hazelcast
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/META-INF/MANIFEST.MF28
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/pom.xml63
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastRegistry.java119
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/ReplicatedEndpointRegistry.java458
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry18
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryTestCase.java140
6 files changed, 826 insertions, 0 deletions
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/META-INF/MANIFEST.MF b/sca-java-2.x/trunk/modules/endpoint-hazelcast/META-INF/MANIFEST.MF
new file mode 100644
index 0000000000..06df55ef38
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/META-INF/MANIFEST.MF
@@ -0,0 +1,28 @@
+Manifest-Version: 1.0
+Private-Package: org.apache.tuscany.sca.xsd.impl;version="2.0.0"
+SCA-Version: 1.1
+Bundle-Name: Apache Tuscany SCA Tomcat Tribes Based EndPoint Registry
+Bundle-Vendor: The Apache Software Foundation
+Bundle-Version: 2.0.0
+Bundle-ManifestVersion: 2
+Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt
+Bundle-Description: Apache Tuscany SCA XSD Model
+Bundle-SymbolicName: org.apache.tuscany.sca.endpoint.tribes
+Bundle-DocURL: http://www.apache.org/
+Bundle-RequiredExecutionEnvironment: J2SE-1.5,JavaSE-1.6
+Import-Package: org.apache.catalina.tribes,
+ org.apache.catalina.tribes.group,
+ org.apache.catalina.tribes.group.interceptors,
+ org.apache.catalina.tribes.io,
+ org.apache.catalina.tribes.membership,
+ org.apache.catalina.tribes.tipis,
+ org.apache.catalina.tribes.transport,
+ org.apache.catalina.tribes.util,
+ org.apache.juli.logging;resolution:=optional,
+ org.apache.tuscany.sca.assembly;version="2.0.0",
+ org.apache.tuscany.sca.core;version="2.0.0",
+ org.apache.tuscany.sca.core.assembly.impl;scope=internal;version="2.0.0";resolution:=optional,
+ org.apache.tuscany.sca.management;version="2.0.0",
+ org.apache.tuscany.sca.policy;version="2.0.0",
+ org.apache.tuscany.sca.runtime;version="2.0.0"
+Export-Package: org.apache.tuscany.sca.endpoint.tribes;version="2.0.0"
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/pom.xml b/sca-java-2.x/trunk/modules/endpoint-hazelcast/pom.xml
new file mode 100644
index 0000000000..db2011a387
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/pom.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<project>
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tuscany.sca</groupId>
+ <artifactId>tuscany-modules</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>tuscany-endpoint-hazelcast</artifactId>
+ <name>Apache Tuscany SCA EndPoint Registry using Hazelcast</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast</artifactId>
+ <version>1.8</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tuscany.sca</groupId>
+ <artifactId>tuscany-core-spi</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tuscany.sca</groupId>
+ <artifactId>tuscany-core</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tuscany.sca</groupId>
+ <artifactId>tuscany-deployment</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tuscany.sca</groupId>
+ <artifactId>tuscany-implementation-java-runtime</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastRegistry.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastRegistry.java
new file mode 100644
index 0000000000..15247f0035
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastRegistry.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.hazelcast;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+
+import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.assembly.impl.EndpointRegistryImpl;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.TcpIpConfig;
+import com.hazelcast.config.XmlConfigBuilder;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+import com.hazelcast.nio.Address;
+
+/**
+ * tuscany:[domainName]?listen=[port|ip:port]]&password=abc&multicast=[off|port|ip:port]&remotes=ip:port,ip:port,...
+ * listen defines the local bind address and port, it defaults to all network interfaces on port 14820 and if that port in use it will try incrementing by one till a free port is found.
+ * password is the password other nodes must use to connect to this domain. The default is 'tuscany'.
+ * multicast defines if multicast discover is used and if so what multicast ip group and port is used. The default is multicast is off if remotes= is specified (only for now due to a code limitation that is planned to be fixed), other wise if remotes= is not specified then multicast defaults to 224.5.12.10:51482
+ *
+ */
+
+public class HazelcastRegistry extends EndpointRegistryImpl {
+
+ private String endpointRegistryURI;
+ private String domainURI;
+ private IMap<Object, Object> hazelcastMap;
+
+ public HazelcastRegistry(ExtensionPointRegistry extensionPoints, String endpointRegistryURI, String domainURI) {
+ super(extensionPoints, endpointRegistryURI, domainURI);
+ init();
+ }
+
+ @Override
+ public synchronized void addEndpoint(Endpoint endpoint) {
+ hazelcastMap.put(endpoint.getURI(), endpoint);
+ super.addEndpoint(endpoint);
+ }
+
+ @Override
+ public synchronized void removeEndpoint(Endpoint endpoint) {
+ hazelcastMap.remove(endpoint.getURI());
+ super.addEndpoint(endpoint);
+ }
+
+ @Override
+ public synchronized void updateEndpoint(String uri, Endpoint endpoint) {
+ // TODO: is updateEndpoint needed?
+ throw new UnsupportedOperationException();
+ }
+
+ protected void init() {
+ int listenPort = 0;
+ int connectPorts = 0;
+ boolean multicast = false;
+ try {
+ HazelcastInstance hazelcastInstance = createHazelcastInstance(multicast, listenPort, connectPorts);
+ String domainName = "";
+ hazelcastMap = hazelcastInstance.getMap(domainName);
+
+
+
+
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private HazelcastInstance createHazelcastInstance(boolean multicast, int listenPort, int... connectPorts) throws UnknownHostException {
+ Config config = new XmlConfigBuilder().build();
+ config.setPort(listenPort);
+ config.setPortAutoIncrement(false);
+
+ // declare the interface Hazelcast should bind to
+ config.getNetworkConfig().getInterfaces().clear();
+ config.getNetworkConfig().getInterfaces().addInterface(InetAddress.getLocalHost().getHostAddress());
+ config.getNetworkConfig().getInterfaces().setEnabled(true);
+
+ if (!multicast) {
+ config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
+ }
+
+ if (connectPorts.length > 0) {
+ TcpIpConfig tcpconfig = config.getNetworkConfig().getJoin().getJoinMembers();
+ tcpconfig.setEnabled(true);
+
+ List<Address> lsMembers = tcpconfig.getAddresses();
+ lsMembers.clear();
+ for (int p : connectPorts) {
+ lsMembers.add(new Address(InetAddress.getLocalHost(), p));
+ }
+ }
+
+ return Hazelcast.newHazelcastInstance(config);
+ }
+}
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/ReplicatedEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/ReplicatedEndpointRegistry.java
new file mode 100644
index 0000000000..e61af743fb
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/ReplicatedEndpointRegistry.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.hazelcast;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
+import org.apache.catalina.tribes.membership.McastService;
+import org.apache.catalina.tribes.membership.StaticMember;
+import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.assembly.EndpointReference;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.LifeCycleListener;
+import org.apache.tuscany.sca.endpoint.tribes.AbstractReplicatedMap.MapEntry;
+import org.apache.tuscany.sca.endpoint.tribes.MapStore.MapListener;
+import org.apache.tuscany.sca.runtime.EndpointListener;
+import org.apache.tuscany.sca.runtime.EndpointRegistry;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+
+import com.hazelcast.core.IMap;
+
+/**
+ * A replicated EndpointRegistry based on Apache Tomcat Tribes
+ */
+public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleListener {
+ private final static Logger logger = Logger.getLogger(ReplicatedEndpointRegistry.class.getName());
+ private static final String MULTICAST_ADDRESS = "228.0.0.100";
+ private static final int MULTICAST_PORT = 50000;
+
+ private static final int FIND_REPEAT_COUNT = 10;
+
+ private int port = MULTICAST_PORT;
+ private String address = MULTICAST_ADDRESS;
+ private String bind = null;
+ private int timeout = 50;
+
+ private final static String DEFAULT_DOMAIN_URI = "http://tuscany.apache.org/sca/1.1/domains/default";
+ private String domainURI = DEFAULT_DOMAIN_URI;
+ private List<EndpointReference> endpointreferences = new CopyOnWriteArrayList<EndpointReference>();
+ private List<EndpointListener> listeners = new CopyOnWriteArrayList<EndpointListener>();
+
+ private ExtensionPointRegistry registry;
+ private IMap<Object, Object> map;
+ private static List<URI> staticRoutes;
+
+ private String id;
+ private boolean noMultiCast;
+
+// private static final GroupChannel createChannel(String address, int port, String bindAddress) {
+//
+// //create a channel
+// GroupChannel channel = new GroupChannel();
+// McastService mcastService = (McastService)channel.getMembershipService();
+// mcastService.setPort(port);
+// mcastService.setAddress(address);
+//
+// // REVIEW: In my case, there are multiple IP addresses
+// // One for the WIFI and the other one for VPN. For some reason the VPN one doesn't support
+// // Multicast
+//
+// if (bindAddress != null) {
+// mcastService.setBind(bindAddress);
+// } else {
+// mcastService.setBind(getBindAddress());
+// }
+//
+// return channel;
+// }
+
+ public ReplicatedEndpointRegistry(ExtensionPointRegistry registry,
+ Map<String, String> attributes,
+ String domainRegistryURI,
+ String domainURI) {
+ this.registry = registry;
+ this.domainURI = domainURI;
+ this.id = "[" + System.identityHashCode(this) + "]";
+ getParameters(attributes, domainRegistryURI);
+ }
+
+ private Map<String, String> getParameters(Map<String, String> attributes, String domainRegistryURI) {
+ Map<String, String> map = new HashMap<String, String>();
+ if (attributes != null) {
+ map.putAll(attributes);
+ }
+ URI uri = URI.create(domainRegistryURI);
+ if (uri.getHost() != null) {
+ map.put("address", uri.getHost());
+ }
+ if (uri.getPort() != -1) {
+ map.put("port", String.valueOf(uri.getPort()));
+ }
+ int index = domainRegistryURI.indexOf('?');
+ if (index == -1) {
+ setConfig(map);
+ return map;
+ }
+ String query = domainRegistryURI.substring(index + 1);
+ try {
+ query = URLDecoder.decode(query, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalArgumentException(e);
+ }
+ String[] params = query.split("&");
+ for (String param : params) {
+ index = param.indexOf('=');
+ if (index != -1) {
+ map.put(param.substring(0, index), param.substring(index + 1));
+ }
+ }
+ setConfig(map);
+ return map;
+ }
+
+ private void setConfig(Map<String, String> attributes) {
+ String portStr = attributes.get("port");
+ if (portStr != null) {
+ port = Integer.parseInt(portStr);
+ if (port == -1) {
+ port = MULTICAST_PORT;
+ }
+ }
+ String address = attributes.get("address");
+ if (address == null) {
+ address = MULTICAST_ADDRESS;
+ }
+ bind = attributes.get("bind");
+ String timeoutStr = attributes.get("timeout");
+ if (timeoutStr != null) {
+ timeout = Integer.parseInt(timeoutStr);
+ }
+
+ String routesStr = attributes.get("routes");
+ if (routesStr != null) {
+ StringTokenizer st = new StringTokenizer(routesStr);
+ staticRoutes = new ArrayList<URI>();
+ while (st.hasMoreElements()) {
+ staticRoutes.add(URI.create("tcp://" + st.nextToken()));
+ }
+ }
+ String mcast = attributes.get("nomcast");
+ if (mcast != null) {
+ noMultiCast = Boolean.valueOf(mcast);
+ }
+ }
+
+ public void start() {
+ if (map != null) {
+ throw new IllegalStateException("The registry has already been started");
+ }
+ GroupChannel channel = createChannel(address, port, bind);
+ map =
+ new ReplicatedMap(null, channel, timeout, this.domainURI,
+ new ClassLoader[] {ReplicatedEndpointRegistry.class.getClassLoader()});
+ map.addListener(this);
+
+ if (noMultiCast) {
+ map.getChannel().addInterceptor(new DisableMcastInterceptor());
+ }
+
+ // http://www.mail-archive.com/users@tomcat.apache.org/msg24873.html
+ int port = channel.getChannelReceiver().getPort();
+
+ if (staticRoutes != null) {
+ StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
+ for (URI staticRoute : staticRoutes) {
+ Member member;
+ try {
+ // The port has to match the receiver port
+ member = new StaticMember(staticRoute.getHost(), port, 5000);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ smi.addStaticMember(member);
+ logger.info("Added static route: " + staticRoute.getHost() + ":" + port);
+ }
+ smi.setLocalMember(map.getChannel().getLocalMember(false));
+ map.getChannel().addInterceptor(smi);
+ }
+
+ try {
+ map.getChannel().start(Channel.DEFAULT);
+ } catch (ChannelException e) {
+ throw new IllegalStateException(e);
+ }
+
+ }
+
+ public void stop() {
+ if (map != null) {
+ map.removeListener(this);
+ Channel channel = map.getChannel();
+ map.breakdown();
+ try {
+ channel.stop(Channel.DEFAULT);
+ } catch (ChannelException e) {
+ logger.log(Level.WARNING, e.getMessage(), e);
+ }
+ map = null;
+ }
+ }
+
+ public void addEndpoint(Endpoint endpoint) {
+ map.put(endpoint.getURI(), endpoint);
+ logger.info("Add endpoint - " + endpoint);
+ }
+
+ public void addEndpointReference(EndpointReference endpointReference) {
+ endpointreferences.add(endpointReference);
+ logger.fine("Add endpoint reference - " + endpointReference);
+ }
+
+ public void addListener(EndpointListener listener) {
+ listeners.add(listener);
+ }
+
+ /**
+ * Parse the component/service/binding URI into an array of parts (componentURI, serviceName, bindingName)
+ * @param uri
+ * @return
+ */
+ private String[] parse(String uri) {
+ String[] names = new String[3];
+ int index = uri.lastIndexOf('#');
+ if (index == -1) {
+ names[0] = uri;
+ } else {
+ names[0] = uri.substring(0, index);
+ String str = uri.substring(index + 1);
+ if (str.startsWith("service-binding(") && str.endsWith(")")) {
+ str = str.substring("service-binding(".length(), str.length() - 1);
+ String[] parts = str.split("/");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Invalid service-binding URI: " + uri);
+ }
+ names[1] = parts[0];
+ names[2] = parts[1];
+ } else if (str.startsWith("service(") && str.endsWith(")")) {
+ str = str.substring("service(".length(), str.length() - 1);
+ names[1] = str;
+ } else {
+ throw new IllegalArgumentException("Invalid component/service/binding URI: " + uri);
+ }
+ }
+ return names;
+ }
+
+ private boolean matches(String target, String uri) {
+ String[] parts1 = parse(target);
+ String[] parts2 = parse(uri);
+ for (int i = 0; i < parts1.length; i++) {
+ if (parts1[i] == null || parts1[i].equals(parts2[i])) {
+ continue;
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public List<Endpoint> findEndpoint(EndpointReference endpointReference) {
+ List<Endpoint> foundEndpoints = new ArrayList<Endpoint>();
+
+ logger.fine("Find endpoint for reference - " + endpointReference);
+
+ if (endpointReference.getReference() != null) {
+ Endpoint targetEndpoint = endpointReference.getTargetEndpoint();
+
+ // in the failure case we repeat the look up after a short
+ // delay to take account of tribes replication delays
+ int repeat = FIND_REPEAT_COUNT;
+
+ while (repeat > 0){
+ for (Object v : map.values()) {
+ Endpoint endpoint = (Endpoint)v;
+ // TODO: implement more complete matching
+ logger.fine("Matching against - " + endpoint);
+ if (matches(targetEndpoint.getURI(), endpoint.getURI())) {
+ MapEntry entry = map.getInternal(endpoint.getURI());
+ if (!isLocal(entry)) {
+ endpoint.setRemote(true);
+ }
+ // if (!entry.isPrimary()) {
+ ((RuntimeEndpoint) endpoint).bind(registry, this);
+ // }
+ foundEndpoints.add(endpoint);
+ logger.fine("Found endpoint with matching service - " + endpoint);
+ repeat = 0;
+ }
+ // else the service name doesn't match
+ }
+
+ if (foundEndpoints.size() == 0) {
+ // the service name doesn't match any endpoints so wait a little and try
+ // again in case this is caused by tribes synch delays
+ logger.info("Repeating endpoint reference match - " + endpointReference);
+ repeat--;
+ try {
+ Thread.sleep(1000);
+ } catch(Exception ex){
+ // do nothing
+ repeat=0;
+ }
+ }
+ }
+ }
+
+ return foundEndpoints;
+ }
+
+ private boolean isLocal(MapEntry entry) {
+ return entry.getPrimary().equals(map.getChannel().getLocalMember(false));
+ map.
+ }
+
+ public List<EndpointReference> findEndpointReference(Endpoint endpoint) {
+ return endpointreferences;
+ }
+
+ public Endpoint getEndpoint(String uri) {
+ return (Endpoint)map.get(uri);
+ }
+
+ public List<EndpointReference> getEndpointReferences() {
+ return endpointreferences;
+ }
+
+ public List<Endpoint> getEndpoints() {
+ return new ArrayList(map.values());
+ }
+
+ public List<EndpointListener> getListeners() {
+ return listeners;
+ }
+
+ public void removeEndpoint(Endpoint endpoint) {
+ map.remove(endpoint.getURI());
+ logger.info("Remove endpoint - " + endpoint);
+ }
+
+ public void removeEndpointReference(EndpointReference endpointReference) {
+ endpointreferences.remove(endpointReference);
+ logger.fine("Remove endpoint reference - " + endpointReference);
+ }
+
+ public void removeListener(EndpointListener listener) {
+ listeners.remove(listener);
+ }
+
+ public void updateEndpoint(String uri, Endpoint endpoint) {
+// // TODO: is updateEndpoint needed?
+// throw new UnsupportedOperationException();
+ }
+
+// public void entryAdded(Object key, Object value) {
+// MapEntry entry = (MapEntry)value;
+// Endpoint newEp = (Endpoint)entry.getValue();
+// if (!isLocal(entry)) {
+// logger.info(id + " Remote endpoint added: " + entry.getValue());
+// newEp.setRemote(true);
+// }
+// ((RuntimeEndpoint) newEp).bind(registry, this);
+// for (EndpointListener listener : listeners) {
+// listener.endpointAdded(newEp);
+// }
+// }
+//
+// public void entryRemoved(Object key, Object value) {
+// MapEntry entry = (MapEntry)value;
+// if (!isLocal(entry)) {
+// logger.info(id + " Remote endpoint removed: " + entry.getValue());
+// }
+// Endpoint oldEp = (Endpoint)entry.getValue();
+// for (EndpointListener listener : listeners) {
+// listener.endpointRemoved(oldEp);
+// }
+// }
+//
+// public void entryUpdated(Object key, Object oldValue, Object newValue) {
+// MapEntry oldEntry = (MapEntry)oldValue;
+// MapEntry newEntry = (MapEntry)newValue;
+// if (!isLocal(newEntry)) {
+// logger.info(id + " Remote endpoint updated: " + newEntry.getValue());
+// }
+// Endpoint oldEp = (Endpoint)oldEntry.getValue();
+// Endpoint newEp = (Endpoint)newEntry.getValue();
+// ((RuntimeEndpoint) newEp).bind(registry, this);
+// for (EndpointListener listener : listeners) {
+// listener.endpointUpdated(oldEp, newEp);
+// }
+// }
+
+ private static String getBindAddress() {
+ try {
+ Enumeration<NetworkInterface> nis = NetworkInterface.getNetworkInterfaces();
+ while (nis.hasMoreElements()) {
+ NetworkInterface ni = nis.nextElement();
+ // The following APIs require JDK 1.6
+ /*
+ if (ni.isLoopback() || !ni.isUp() || !ni.supportsMulticast()) {
+ continue;
+ }
+ */
+ Enumeration<InetAddress> ips = ni.getInetAddresses();
+ if (!ips.hasMoreElements()) {
+ continue;
+ }
+ while (ips.hasMoreElements()) {
+ InetAddress addr = ips.nextElement();
+ if (addr.isLoopbackAddress()) {
+ continue;
+ }
+ return addr.getHostAddress();
+ }
+ }
+ return InetAddress.getLocalHost().getHostAddress();
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, e.getMessage(), e);
+ return null;
+ }
+ }
+
+}
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry
new file mode 100644
index 0000000000..626935e71e
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+org.apache.tuscany.sca.endpoint.hazelcast.HazelcastRegistry;ranking=150,address=228.0.0.100,port=50000,timeout=50,scheme=tuscany
+
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryTestCase.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryTestCase.java
new file mode 100644
index 0000000000..01052fa0f5
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryTestCase.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.hazelcast;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.TcpIpConfig;
+import com.hazelcast.config.XmlConfigBuilder;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+import com.hazelcast.nio.Address;
+
+public class RegistryTestCase {
+
+ @Test
+ public void test1() throws UnknownHostException {
+
+ HazelcastInstance h1 = create("54327", 9001);
+
+ IMap<Object, Object> h1map = h1.getMap("mymap");
+ h1map.put("key1", "bla1");
+ Assert.assertEquals("bla1", h1map.get("key1"));
+
+ HazelcastInstance h2 = create("false", 9002, 9001);
+ IMap<Object, Object> h2map = h2.getMap("mymap");
+ Assert.assertEquals("bla1", h2map.get("key1"));
+
+ HazelcastInstance h3 = create("false", 9003, 9002);
+ IMap<Object, Object> h3map = h3.getMap("mymap");
+ Assert.assertEquals("bla1", h3map.get("key1"));
+
+ h3map.put("k3", "v3");
+ h2map.put("k2", "v2");
+
+ Assert.assertEquals("v2", h1map.get("k2"));
+ Assert.assertEquals("v3", h1map.get("k3"));
+ Assert.assertEquals("v2", h2map.get("k2"));
+ Assert.assertEquals("v3", h2map.get("k3"));
+ Assert.assertEquals("v2", h3map.get("k2"));
+ Assert.assertEquals("v3", h3map.get("k3"));
+
+ HazelcastInstance h4 = create("54328", 9004, 9001);
+ IMap<Object, Object> h4map = h4.getMap("mymap");
+// Assert.assertNull(h4map.get("k2"));
+// Assert.assertNull(h4map.get("k3"));
+ Assert.assertEquals("v2", h4map.get("k2"));
+ Assert.assertEquals("v3", h4map.get("k3"));
+
+// HazelcastInstance h5 = create("false", 9005, 9003, 9004);
+ HazelcastInstance h5 = create("54328", 9005);
+
+// Assert.assertEquals("v2", h4map.get("k2"));
+// Assert.assertEquals("v3", h4map.get("k3"));
+
+ IMap<Object, Object> h5map = h5.getMap("mymap");
+ Assert.assertEquals("v2", h5map.get("k2"));
+ Assert.assertEquals("v3", h5map.get("k3"));
+
+ h1.shutdown();
+
+ Assert.assertEquals("v2", h2map.get("k2"));
+ Assert.assertEquals("v3", h2map.get("k3"));
+ Assert.assertEquals("v2", h3map.get("k2"));
+ Assert.assertEquals("v3", h3map.get("k3"));
+ Assert.assertEquals("v2", h4map.get("k2"));
+ Assert.assertEquals("v3", h4map.get("k3"));
+
+ h3map.put("key1a", "bla1a");
+
+ Assert.assertEquals("bla1a", h2map.get("key1a"));
+ Assert.assertEquals("bla1a", h3map.get("key1a"));
+ Assert.assertEquals("bla1a", h4map.get("key1a"));
+
+// HazelcastInstance h4 = create(true, 9004, 9003);
+// HazelcastInstance h5 = create(true, 9005);
+// IMap<Object, Object> h5map = h5.getMap("mymap");
+// Assert.assertEquals("bla1", h5map.get("key1"));
+
+// HazelcastInstance h6 = create(false, 9006, 9005);
+// IMap<Object, Object> h6map = h6.getMap("mymap");
+// Assert.assertEquals("bla1", h6map.get("key1"));
+
+ }
+
+ private HazelcastInstance create(String multicast, int listenPort, int... connectPorts) throws UnknownHostException {
+ Config config = new XmlConfigBuilder().build();
+ config.setPort(listenPort);
+ config.setPortAutoIncrement(false);
+
+ // declare the interface Hazelcast should bind to
+ config.getNetworkConfig().getInterfaces().clear();
+ config.getNetworkConfig().getInterfaces().addInterface(InetAddress.getLocalHost().getHostAddress());
+ config.getNetworkConfig().getInterfaces().setEnabled(true);
+
+ if ("false".equals(multicast)) {
+ config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
+ } else {
+ config.getNetworkConfig().getJoin().getMulticastConfig().setMulticastPort(Integer.parseInt(multicast));
+ }
+
+ if (connectPorts.length > 0) {
+ TcpIpConfig tcpconfig = config.getNetworkConfig().getJoin().getJoinMembers();
+ tcpconfig.setEnabled(true);
+
+ List<Address> lsMembers = tcpconfig.getAddresses();
+ lsMembers.clear();
+ for (int p : connectPorts) {
+ lsMembers.add(new Address(InetAddress.getLocalHost(), p));
+ }
+ }
+
+ return Hazelcast.newHazelcastInstance(config);
+ }
+
+}