summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2010-01-05 23:25:33 +0000
committerantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2010-01-05 23:25:33 +0000
commit6defaf4067f5af0e86426deaaae4830d3da47236 (patch)
treec2b9aac7137492e8eb7b66193075e1c7ebdc47e5
parent7100b31b4fc3501b6c465dbc977548a386d323cf (diff)
More changes to get the Hazelcast endpoint registry working
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@896269 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/ConfigURI.java158
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java293
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastRegistry.java119
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/ReplicatedEndpointRegistry.java458
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry2
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/ConfigURITestCase.java106
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryTestCase.java2
7 files changed, 560 insertions, 578 deletions
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/ConfigURI.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/ConfigURI.java
new file mode 100644
index 0000000000..c226405696
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/ConfigURI.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.hazelcast;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Utility to parse the config uri string.
+ *
+ * tuscany:[domainName]?listen=[port|ip:port]]&password=abc&multicast=[off|port|ip:port]&remotes=ip:port,ip:port,...
+
+ * listen - defines the local bind address and port, it defaults to all network interfaces on port 14820 and if that port in use it will try incrementing by one till a free port is found.
+ * password - is the password other nodes must use to connect to this domain. The default is 'tuscany'.
+ * multicast - defines if multicast discovery is used and if so what multicast ip group and port is used.
+ * The default is multicast is off if remotes= is specified (only for now due to a Hazelcast limitation that is planned to be fixed),
+ * otherwise if remotes= is not specified then multicast defaults to 224.5.12.10:51482
+ * remotes - a list of ipAddress:port for remote nodes
+ *
+ * TODO: move this to a base module as it will also be used by the SCAClient
+ * add JCA encryption config
+ */
+public class ConfigURI {
+
+ private String domainName = "default";
+ private String bindAddress = null; // null means all network adapters
+ private int listenPort = 14820;
+ private String password = "tuscany";
+ private boolean multicastDisabled = false;
+ private String multicastAddress = "224.5.12.10";
+ private int multicastPort = 51482;
+ private List<String> remotes = new ArrayList<String>();
+
+ public ConfigURI(String uri) {
+ parseURI(uri);
+ }
+
+ private void parseURI(String uri) {
+ if (!uri.startsWith("tuscany:")) {
+ throw new IllegalArgumentException("Config URI must start with 'tuscany:'");
+ }
+
+ // make it a URI so java.net.URI can be used to parse it
+ int i = uri.indexOf(":");
+ if (uri.charAt("tuscany:".length()+1) != '/') {
+ uri = uri.replaceFirst(":", ":/");
+ }
+ if (uri.charAt("tuscany:".length()+2) != '/') {
+ uri = uri.replaceFirst(":/", "://");
+ }
+ URI configURI = URI.create(uri);
+
+ this.domainName = configURI.getHost();
+
+ String query = configURI.getQuery();
+ if (query != null && query.length() > 0) {
+ String[] params = query.split("&");
+ Map<String, String> paramMap = new HashMap<String, String>();
+ for (String param : params) {
+ paramMap.put(param.split("=")[0], param.split("=")[1]);
+ }
+ for (String name : paramMap.keySet()) {
+ String value = paramMap.get(name);
+ if ("listen".equals(name)) {
+ if (value.indexOf(":") == -1) {
+ this.listenPort = Integer.parseInt(value);
+ } else {
+ String[] addr = value.split(":");
+ this.bindAddress = addr[0];
+ this.listenPort = Integer.parseInt(addr[1]);
+ }
+ } else if ("multicast".equals(name)) {
+ if ("off".equalsIgnoreCase(value)) {
+ this.multicastDisabled = true;
+ } else {
+ if (value.indexOf(":") == -1) {
+ this.multicastAddress = value;
+ } else {
+ String[] addr = value.split(":");
+ this.multicastAddress = addr[0];
+ this.multicastPort = Integer.parseInt(addr[1]);
+ }
+ }
+ } else if ("password".equals(name)) {
+ this.password = value;
+ } else if ("remotes".equals(name)) {
+ String[] ips = value.split(",");
+ for (String ip : ips) {
+ if (ip.indexOf(":") == -1) {
+ remotes.add(ip + ":14820");
+ } else {
+ remotes.add(ip);
+ }
+ }
+ if (paramMap.containsKey("multicast")) {
+ throw new IllegalArgumentException("Cannot have multicast and remotes (for now)");
+ } else {
+ this.multicastDisabled = true;
+ }
+ }
+ }
+ }
+ }
+
+ public String getDomainName() {
+ return domainName;
+ }
+
+ public String getBindAddress() {
+ return bindAddress;
+ }
+
+ public int getListenPort() {
+ return listenPort;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public boolean isMulticastDisabled() {
+ return multicastDisabled;
+ }
+
+ public String getMulticastAddress() {
+ return multicastAddress;
+ }
+
+ public int getMulticastPort() {
+ return multicastPort;
+ }
+
+ public List<String> getRemotes() {
+ return remotes;
+ }
+
+}
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
new file mode 100644
index 0000000000..54c1d1c0a5
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.hazelcast;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.logging.Logger;
+
+import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.assembly.EndpointReference;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.LifeCycleListener;
+import org.apache.tuscany.sca.runtime.EndpointListener;
+import org.apache.tuscany.sca.runtime.EndpointRegistry;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.TcpIpConfig;
+import com.hazelcast.config.XmlConfigBuilder;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+import com.hazelcast.nio.Address;
+
+/**
+ * An EndpointRegistry using a Hazelcast
+ */
+public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleListener {
+ private final static Logger logger = Logger.getLogger(HazelcastEndpointRegistry.class.getName());
+
+ private List<EndpointReference> endpointreferences = new CopyOnWriteArrayList<EndpointReference>();
+ private List<EndpointListener> listeners = new CopyOnWriteArrayList<EndpointListener>();
+
+ private ExtensionPointRegistry registry;
+ private ConfigURI configURI;
+
+ private HazelcastInstance hazelcastInstance;
+ private IMap<Object, Object> map;
+ private List<String> localEndpoints = new ArrayList<String>();;
+
+ public HazelcastEndpointRegistry(ExtensionPointRegistry registry,
+ Map<String, String> attributes,
+ String domainRegistryURI,
+ String domainURI) {
+ this.registry = registry;
+ this.configURI = new ConfigURI(domainRegistryURI);
+ }
+
+ public void start() {
+ if (map != null) {
+ throw new IllegalStateException("The registry has already been started");
+ }
+ initHazelcastInstance();
+ map = hazelcastInstance.getMap(configURI.getDomainName() + "Endpoints");
+ }
+
+ public void stop() {
+ if (map != null) {
+ hazelcastInstance.shutdown();
+ }
+ }
+
+ private void initHazelcastInstance() {
+ Config config = new XmlConfigBuilder().build();
+
+ config.setPort(configURI.getListenPort());
+ //config.setPortAutoIncrement(false);
+
+ if (configURI.getBindAddress() != null) {
+ config.getNetworkConfig().getInterfaces().setEnabled(true);
+ config.getNetworkConfig().getInterfaces().clear();
+ config.getNetworkConfig().getInterfaces().addInterface(configURI.getBindAddress());
+ }
+
+ config.getGroupConfig().setName(configURI.getDomainName());
+ config.getGroupConfig().setPassword(configURI.getPassword());
+
+ if (configURI.isMulticastDisabled()) {
+ config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
+ } else {
+ config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(true);
+ config.getNetworkConfig().getJoin().getMulticastConfig().setMulticastPort(configURI.getMulticastPort());
+ config.getNetworkConfig().getJoin().getMulticastConfig().setMulticastGroup(configURI.getMulticastAddress());
+ }
+
+ if (configURI.getRemotes().size() > 0) {
+ TcpIpConfig tcpconfig = config.getNetworkConfig().getJoin().getJoinMembers();
+ tcpconfig.setEnabled(true);
+ List<Address> lsMembers = tcpconfig.getAddresses();
+ lsMembers.clear();
+ for (String addr : configURI.getRemotes()) {
+ String[] ipNPort = addr.split(":");
+ try {
+ lsMembers.add(new Address(ipNPort[0], Integer.parseInt(ipNPort[1])));
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ this.hazelcastInstance = Hazelcast.newHazelcastInstance(config);
+ }
+
+ public void addEndpoint(Endpoint endpoint) {
+ map.put(endpoint.getURI(), endpoint);
+ localEndpoints.add(endpoint.getURI());
+ logger.info("Add endpoint - " + endpoint);
+ }
+
+ public void addEndpointReference(EndpointReference endpointReference) {
+ endpointreferences.add(endpointReference);
+ logger.fine("Add endpoint reference - " + endpointReference);
+ }
+
+ public void addListener(EndpointListener listener) {
+ listeners.add(listener);
+ }
+
+ /**
+ * Parse the component/service/binding URI into an array of parts (componentURI, serviceName, bindingName)
+ * @param uri
+ * @return
+ */
+ private String[] parse(String uri) {
+ String[] names = new String[3];
+ int index = uri.lastIndexOf('#');
+ if (index == -1) {
+ names[0] = uri;
+ } else {
+ names[0] = uri.substring(0, index);
+ String str = uri.substring(index + 1);
+ if (str.startsWith("service-binding(") && str.endsWith(")")) {
+ str = str.substring("service-binding(".length(), str.length() - 1);
+ String[] parts = str.split("/");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Invalid service-binding URI: " + uri);
+ }
+ names[1] = parts[0];
+ names[2] = parts[1];
+ } else if (str.startsWith("service(") && str.endsWith(")")) {
+ str = str.substring("service(".length(), str.length() - 1);
+ names[1] = str;
+ } else {
+ throw new IllegalArgumentException("Invalid component/service/binding URI: " + uri);
+ }
+ }
+ return names;
+ }
+
+ private boolean matches(String target, String uri) {
+ String[] parts1 = parse(target);
+ String[] parts2 = parse(uri);
+ for (int i = 0; i < parts1.length; i++) {
+ if (parts1[i] == null || parts1[i].equals(parts2[i])) {
+ continue;
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public List<Endpoint> findEndpoint(EndpointReference endpointReference) {
+ List<Endpoint> foundEndpoints = new ArrayList<Endpoint>();
+
+ logger.fine("Find endpoint for reference - " + endpointReference);
+
+ if (endpointReference.getReference() != null) {
+ Endpoint targetEndpoint = endpointReference.getTargetEndpoint();
+
+ for (Object v : map.values()) {
+ Endpoint endpoint = (Endpoint)v;
+ logger.fine("Matching against - " + endpoint);
+ if (matches(targetEndpoint.getURI(), endpoint.getURI())) {
+ if (!isLocal(endpoint)) {
+ endpoint.setRemote(true);
+ }
+ // if (!entry.isPrimary()) {
+ ((RuntimeEndpoint) endpoint).bind(registry, this);
+ // }
+ foundEndpoints.add(endpoint);
+ logger.fine("Found endpoint with matching service - " + endpoint);
+ }
+ // else the service name doesn't match
+ }
+ }
+
+ return foundEndpoints;
+ }
+
+ private boolean isLocal(Endpoint endpoint) {
+ return localEndpoints.contains(endpoint.getURI());
+ }
+
+ public List<EndpointReference> findEndpointReference(Endpoint endpoint) {
+ return endpointreferences;
+ }
+
+ public Endpoint getEndpoint(String uri) {
+ return (Endpoint)map.get(uri);
+ }
+
+ public List<EndpointReference> getEndpointReferences() {
+ return endpointreferences;
+ }
+
+ public List<Endpoint> getEndpoints() {
+ return new ArrayList(map.values());
+ }
+
+ public List<EndpointListener> getListeners() {
+ return listeners;
+ }
+
+ public void removeEndpoint(Endpoint endpoint) {
+ map.remove(endpoint.getURI());
+ localEndpoints.remove(endpoint.getURI());
+ logger.info("Removed endpoint - " + endpoint);
+ }
+
+ public void removeEndpointReference(EndpointReference endpointReference) {
+ endpointreferences.remove(endpointReference);
+ logger.fine("Remove endpoint reference - " + endpointReference);
+ }
+
+ public void removeListener(EndpointListener listener) {
+ listeners.remove(listener);
+ }
+
+ public void updateEndpoint(String uri, Endpoint endpoint) {
+// // TODO: is updateEndpoint needed?
+// throw new UnsupportedOperationException();
+ }
+
+// public void entryAdded(Object key, Object value) {
+// MapEntry entry = (MapEntry)value;
+// Endpoint newEp = (Endpoint)entry.getValue();
+// if (!isLocal(entry)) {
+// logger.info(id + " Remote endpoint added: " + entry.getValue());
+// newEp.setRemote(true);
+// }
+// ((RuntimeEndpoint) newEp).bind(registry, this);
+// for (EndpointListener listener : listeners) {
+// listener.endpointAdded(newEp);
+// }
+// }
+//
+// public void entryRemoved(Object key, Object value) {
+// MapEntry entry = (MapEntry)value;
+// if (!isLocal(entry)) {
+// logger.info(id + " Remote endpoint removed: " + entry.getValue());
+// }
+// Endpoint oldEp = (Endpoint)entry.getValue();
+// for (EndpointListener listener : listeners) {
+// listener.endpointRemoved(oldEp);
+// }
+// }
+//
+// public void entryUpdated(Object key, Object oldValue, Object newValue) {
+// MapEntry oldEntry = (MapEntry)oldValue;
+// MapEntry newEntry = (MapEntry)newValue;
+// if (!isLocal(newEntry)) {
+// logger.info(id + " Remote endpoint updated: " + newEntry.getValue());
+// }
+// Endpoint oldEp = (Endpoint)oldEntry.getValue();
+// Endpoint newEp = (Endpoint)newEntry.getValue();
+// ((RuntimeEndpoint) newEp).bind(registry, this);
+// for (EndpointListener listener : listeners) {
+// listener.endpointUpdated(oldEp, newEp);
+// }
+// }
+
+}
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastRegistry.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastRegistry.java
deleted file mode 100644
index 15247f0035..0000000000
--- a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastRegistry.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.endpoint.hazelcast;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.List;
-
-import org.apache.tuscany.sca.assembly.Endpoint;
-import org.apache.tuscany.sca.core.ExtensionPointRegistry;
-import org.apache.tuscany.sca.core.assembly.impl.EndpointRegistryImpl;
-
-import com.hazelcast.config.Config;
-import com.hazelcast.config.TcpIpConfig;
-import com.hazelcast.config.XmlConfigBuilder;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IMap;
-import com.hazelcast.nio.Address;
-
-/**
- * tuscany:[domainName]?listen=[port|ip:port]]&password=abc&multicast=[off|port|ip:port]&remotes=ip:port,ip:port,...
- * listen defines the local bind address and port, it defaults to all network interfaces on port 14820 and if that port in use it will try incrementing by one till a free port is found.
- * password is the password other nodes must use to connect to this domain. The default is 'tuscany'.
- * multicast defines if multicast discover is used and if so what multicast ip group and port is used. The default is multicast is off if remotes= is specified (only for now due to a code limitation that is planned to be fixed), other wise if remotes= is not specified then multicast defaults to 224.5.12.10:51482
- *
- */
-
-public class HazelcastRegistry extends EndpointRegistryImpl {
-
- private String endpointRegistryURI;
- private String domainURI;
- private IMap<Object, Object> hazelcastMap;
-
- public HazelcastRegistry(ExtensionPointRegistry extensionPoints, String endpointRegistryURI, String domainURI) {
- super(extensionPoints, endpointRegistryURI, domainURI);
- init();
- }
-
- @Override
- public synchronized void addEndpoint(Endpoint endpoint) {
- hazelcastMap.put(endpoint.getURI(), endpoint);
- super.addEndpoint(endpoint);
- }
-
- @Override
- public synchronized void removeEndpoint(Endpoint endpoint) {
- hazelcastMap.remove(endpoint.getURI());
- super.addEndpoint(endpoint);
- }
-
- @Override
- public synchronized void updateEndpoint(String uri, Endpoint endpoint) {
- // TODO: is updateEndpoint needed?
- throw new UnsupportedOperationException();
- }
-
- protected void init() {
- int listenPort = 0;
- int connectPorts = 0;
- boolean multicast = false;
- try {
- HazelcastInstance hazelcastInstance = createHazelcastInstance(multicast, listenPort, connectPorts);
- String domainName = "";
- hazelcastMap = hazelcastInstance.getMap(domainName);
-
-
-
-
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
- }
-
- private HazelcastInstance createHazelcastInstance(boolean multicast, int listenPort, int... connectPorts) throws UnknownHostException {
- Config config = new XmlConfigBuilder().build();
- config.setPort(listenPort);
- config.setPortAutoIncrement(false);
-
- // declare the interface Hazelcast should bind to
- config.getNetworkConfig().getInterfaces().clear();
- config.getNetworkConfig().getInterfaces().addInterface(InetAddress.getLocalHost().getHostAddress());
- config.getNetworkConfig().getInterfaces().setEnabled(true);
-
- if (!multicast) {
- config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
- }
-
- if (connectPorts.length > 0) {
- TcpIpConfig tcpconfig = config.getNetworkConfig().getJoin().getJoinMembers();
- tcpconfig.setEnabled(true);
-
- List<Address> lsMembers = tcpconfig.getAddresses();
- lsMembers.clear();
- for (int p : connectPorts) {
- lsMembers.add(new Address(InetAddress.getLocalHost(), p));
- }
- }
-
- return Hazelcast.newHazelcastInstance(config);
- }
-}
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/ReplicatedEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/ReplicatedEndpointRegistry.java
deleted file mode 100644
index e61af743fb..0000000000
--- a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/ReplicatedEndpointRegistry.java
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.endpoint.hazelcast;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.URI;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.GroupChannel;
-import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
-import org.apache.catalina.tribes.membership.McastService;
-import org.apache.catalina.tribes.membership.StaticMember;
-import org.apache.tuscany.sca.assembly.Endpoint;
-import org.apache.tuscany.sca.assembly.EndpointReference;
-import org.apache.tuscany.sca.core.ExtensionPointRegistry;
-import org.apache.tuscany.sca.core.LifeCycleListener;
-import org.apache.tuscany.sca.endpoint.tribes.AbstractReplicatedMap.MapEntry;
-import org.apache.tuscany.sca.endpoint.tribes.MapStore.MapListener;
-import org.apache.tuscany.sca.runtime.EndpointListener;
-import org.apache.tuscany.sca.runtime.EndpointRegistry;
-import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
-
-import com.hazelcast.core.IMap;
-
-/**
- * A replicated EndpointRegistry based on Apache Tomcat Tribes
- */
-public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleListener {
- private final static Logger logger = Logger.getLogger(ReplicatedEndpointRegistry.class.getName());
- private static final String MULTICAST_ADDRESS = "228.0.0.100";
- private static final int MULTICAST_PORT = 50000;
-
- private static final int FIND_REPEAT_COUNT = 10;
-
- private int port = MULTICAST_PORT;
- private String address = MULTICAST_ADDRESS;
- private String bind = null;
- private int timeout = 50;
-
- private final static String DEFAULT_DOMAIN_URI = "http://tuscany.apache.org/sca/1.1/domains/default";
- private String domainURI = DEFAULT_DOMAIN_URI;
- private List<EndpointReference> endpointreferences = new CopyOnWriteArrayList<EndpointReference>();
- private List<EndpointListener> listeners = new CopyOnWriteArrayList<EndpointListener>();
-
- private ExtensionPointRegistry registry;
- private IMap<Object, Object> map;
- private static List<URI> staticRoutes;
-
- private String id;
- private boolean noMultiCast;
-
-// private static final GroupChannel createChannel(String address, int port, String bindAddress) {
-//
-// //create a channel
-// GroupChannel channel = new GroupChannel();
-// McastService mcastService = (McastService)channel.getMembershipService();
-// mcastService.setPort(port);
-// mcastService.setAddress(address);
-//
-// // REVIEW: In my case, there are multiple IP addresses
-// // One for the WIFI and the other one for VPN. For some reason the VPN one doesn't support
-// // Multicast
-//
-// if (bindAddress != null) {
-// mcastService.setBind(bindAddress);
-// } else {
-// mcastService.setBind(getBindAddress());
-// }
-//
-// return channel;
-// }
-
- public ReplicatedEndpointRegistry(ExtensionPointRegistry registry,
- Map<String, String> attributes,
- String domainRegistryURI,
- String domainURI) {
- this.registry = registry;
- this.domainURI = domainURI;
- this.id = "[" + System.identityHashCode(this) + "]";
- getParameters(attributes, domainRegistryURI);
- }
-
- private Map<String, String> getParameters(Map<String, String> attributes, String domainRegistryURI) {
- Map<String, String> map = new HashMap<String, String>();
- if (attributes != null) {
- map.putAll(attributes);
- }
- URI uri = URI.create(domainRegistryURI);
- if (uri.getHost() != null) {
- map.put("address", uri.getHost());
- }
- if (uri.getPort() != -1) {
- map.put("port", String.valueOf(uri.getPort()));
- }
- int index = domainRegistryURI.indexOf('?');
- if (index == -1) {
- setConfig(map);
- return map;
- }
- String query = domainRegistryURI.substring(index + 1);
- try {
- query = URLDecoder.decode(query, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new IllegalArgumentException(e);
- }
- String[] params = query.split("&");
- for (String param : params) {
- index = param.indexOf('=');
- if (index != -1) {
- map.put(param.substring(0, index), param.substring(index + 1));
- }
- }
- setConfig(map);
- return map;
- }
-
- private void setConfig(Map<String, String> attributes) {
- String portStr = attributes.get("port");
- if (portStr != null) {
- port = Integer.parseInt(portStr);
- if (port == -1) {
- port = MULTICAST_PORT;
- }
- }
- String address = attributes.get("address");
- if (address == null) {
- address = MULTICAST_ADDRESS;
- }
- bind = attributes.get("bind");
- String timeoutStr = attributes.get("timeout");
- if (timeoutStr != null) {
- timeout = Integer.parseInt(timeoutStr);
- }
-
- String routesStr = attributes.get("routes");
- if (routesStr != null) {
- StringTokenizer st = new StringTokenizer(routesStr);
- staticRoutes = new ArrayList<URI>();
- while (st.hasMoreElements()) {
- staticRoutes.add(URI.create("tcp://" + st.nextToken()));
- }
- }
- String mcast = attributes.get("nomcast");
- if (mcast != null) {
- noMultiCast = Boolean.valueOf(mcast);
- }
- }
-
- public void start() {
- if (map != null) {
- throw new IllegalStateException("The registry has already been started");
- }
- GroupChannel channel = createChannel(address, port, bind);
- map =
- new ReplicatedMap(null, channel, timeout, this.domainURI,
- new ClassLoader[] {ReplicatedEndpointRegistry.class.getClassLoader()});
- map.addListener(this);
-
- if (noMultiCast) {
- map.getChannel().addInterceptor(new DisableMcastInterceptor());
- }
-
- // http://www.mail-archive.com/users@tomcat.apache.org/msg24873.html
- int port = channel.getChannelReceiver().getPort();
-
- if (staticRoutes != null) {
- StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
- for (URI staticRoute : staticRoutes) {
- Member member;
- try {
- // The port has to match the receiver port
- member = new StaticMember(staticRoute.getHost(), port, 5000);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- smi.addStaticMember(member);
- logger.info("Added static route: " + staticRoute.getHost() + ":" + port);
- }
- smi.setLocalMember(map.getChannel().getLocalMember(false));
- map.getChannel().addInterceptor(smi);
- }
-
- try {
- map.getChannel().start(Channel.DEFAULT);
- } catch (ChannelException e) {
- throw new IllegalStateException(e);
- }
-
- }
-
- public void stop() {
- if (map != null) {
- map.removeListener(this);
- Channel channel = map.getChannel();
- map.breakdown();
- try {
- channel.stop(Channel.DEFAULT);
- } catch (ChannelException e) {
- logger.log(Level.WARNING, e.getMessage(), e);
- }
- map = null;
- }
- }
-
- public void addEndpoint(Endpoint endpoint) {
- map.put(endpoint.getURI(), endpoint);
- logger.info("Add endpoint - " + endpoint);
- }
-
- public void addEndpointReference(EndpointReference endpointReference) {
- endpointreferences.add(endpointReference);
- logger.fine("Add endpoint reference - " + endpointReference);
- }
-
- public void addListener(EndpointListener listener) {
- listeners.add(listener);
- }
-
- /**
- * Parse the component/service/binding URI into an array of parts (componentURI, serviceName, bindingName)
- * @param uri
- * @return
- */
- private String[] parse(String uri) {
- String[] names = new String[3];
- int index = uri.lastIndexOf('#');
- if (index == -1) {
- names[0] = uri;
- } else {
- names[0] = uri.substring(0, index);
- String str = uri.substring(index + 1);
- if (str.startsWith("service-binding(") && str.endsWith(")")) {
- str = str.substring("service-binding(".length(), str.length() - 1);
- String[] parts = str.split("/");
- if (parts.length != 2) {
- throw new IllegalArgumentException("Invalid service-binding URI: " + uri);
- }
- names[1] = parts[0];
- names[2] = parts[1];
- } else if (str.startsWith("service(") && str.endsWith(")")) {
- str = str.substring("service(".length(), str.length() - 1);
- names[1] = str;
- } else {
- throw new IllegalArgumentException("Invalid component/service/binding URI: " + uri);
- }
- }
- return names;
- }
-
- private boolean matches(String target, String uri) {
- String[] parts1 = parse(target);
- String[] parts2 = parse(uri);
- for (int i = 0; i < parts1.length; i++) {
- if (parts1[i] == null || parts1[i].equals(parts2[i])) {
- continue;
- } else {
- return false;
- }
- }
- return true;
- }
-
- public List<Endpoint> findEndpoint(EndpointReference endpointReference) {
- List<Endpoint> foundEndpoints = new ArrayList<Endpoint>();
-
- logger.fine("Find endpoint for reference - " + endpointReference);
-
- if (endpointReference.getReference() != null) {
- Endpoint targetEndpoint = endpointReference.getTargetEndpoint();
-
- // in the failure case we repeat the look up after a short
- // delay to take account of tribes replication delays
- int repeat = FIND_REPEAT_COUNT;
-
- while (repeat > 0){
- for (Object v : map.values()) {
- Endpoint endpoint = (Endpoint)v;
- // TODO: implement more complete matching
- logger.fine("Matching against - " + endpoint);
- if (matches(targetEndpoint.getURI(), endpoint.getURI())) {
- MapEntry entry = map.getInternal(endpoint.getURI());
- if (!isLocal(entry)) {
- endpoint.setRemote(true);
- }
- // if (!entry.isPrimary()) {
- ((RuntimeEndpoint) endpoint).bind(registry, this);
- // }
- foundEndpoints.add(endpoint);
- logger.fine("Found endpoint with matching service - " + endpoint);
- repeat = 0;
- }
- // else the service name doesn't match
- }
-
- if (foundEndpoints.size() == 0) {
- // the service name doesn't match any endpoints so wait a little and try
- // again in case this is caused by tribes synch delays
- logger.info("Repeating endpoint reference match - " + endpointReference);
- repeat--;
- try {
- Thread.sleep(1000);
- } catch(Exception ex){
- // do nothing
- repeat=0;
- }
- }
- }
- }
-
- return foundEndpoints;
- }
-
- private boolean isLocal(MapEntry entry) {
- return entry.getPrimary().equals(map.getChannel().getLocalMember(false));
- map.
- }
-
- public List<EndpointReference> findEndpointReference(Endpoint endpoint) {
- return endpointreferences;
- }
-
- public Endpoint getEndpoint(String uri) {
- return (Endpoint)map.get(uri);
- }
-
- public List<EndpointReference> getEndpointReferences() {
- return endpointreferences;
- }
-
- public List<Endpoint> getEndpoints() {
- return new ArrayList(map.values());
- }
-
- public List<EndpointListener> getListeners() {
- return listeners;
- }
-
- public void removeEndpoint(Endpoint endpoint) {
- map.remove(endpoint.getURI());
- logger.info("Remove endpoint - " + endpoint);
- }
-
- public void removeEndpointReference(EndpointReference endpointReference) {
- endpointreferences.remove(endpointReference);
- logger.fine("Remove endpoint reference - " + endpointReference);
- }
-
- public void removeListener(EndpointListener listener) {
- listeners.remove(listener);
- }
-
- public void updateEndpoint(String uri, Endpoint endpoint) {
-// // TODO: is updateEndpoint needed?
-// throw new UnsupportedOperationException();
- }
-
-// public void entryAdded(Object key, Object value) {
-// MapEntry entry = (MapEntry)value;
-// Endpoint newEp = (Endpoint)entry.getValue();
-// if (!isLocal(entry)) {
-// logger.info(id + " Remote endpoint added: " + entry.getValue());
-// newEp.setRemote(true);
-// }
-// ((RuntimeEndpoint) newEp).bind(registry, this);
-// for (EndpointListener listener : listeners) {
-// listener.endpointAdded(newEp);
-// }
-// }
-//
-// public void entryRemoved(Object key, Object value) {
-// MapEntry entry = (MapEntry)value;
-// if (!isLocal(entry)) {
-// logger.info(id + " Remote endpoint removed: " + entry.getValue());
-// }
-// Endpoint oldEp = (Endpoint)entry.getValue();
-// for (EndpointListener listener : listeners) {
-// listener.endpointRemoved(oldEp);
-// }
-// }
-//
-// public void entryUpdated(Object key, Object oldValue, Object newValue) {
-// MapEntry oldEntry = (MapEntry)oldValue;
-// MapEntry newEntry = (MapEntry)newValue;
-// if (!isLocal(newEntry)) {
-// logger.info(id + " Remote endpoint updated: " + newEntry.getValue());
-// }
-// Endpoint oldEp = (Endpoint)oldEntry.getValue();
-// Endpoint newEp = (Endpoint)newEntry.getValue();
-// ((RuntimeEndpoint) newEp).bind(registry, this);
-// for (EndpointListener listener : listeners) {
-// listener.endpointUpdated(oldEp, newEp);
-// }
-// }
-
- private static String getBindAddress() {
- try {
- Enumeration<NetworkInterface> nis = NetworkInterface.getNetworkInterfaces();
- while (nis.hasMoreElements()) {
- NetworkInterface ni = nis.nextElement();
- // The following APIs require JDK 1.6
- /*
- if (ni.isLoopback() || !ni.isUp() || !ni.supportsMulticast()) {
- continue;
- }
- */
- Enumeration<InetAddress> ips = ni.getInetAddresses();
- if (!ips.hasMoreElements()) {
- continue;
- }
- while (ips.hasMoreElements()) {
- InetAddress addr = ips.nextElement();
- if (addr.isLoopbackAddress()) {
- continue;
- }
- return addr.getHostAddress();
- }
- }
- return InetAddress.getLocalHost().getHostAddress();
- } catch (Exception e) {
- logger.log(Level.SEVERE, e.getMessage(), e);
- return null;
- }
- }
-
-}
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry
index 626935e71e..1983881ce1 100644
--- a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry
@@ -14,5 +14,5 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-org.apache.tuscany.sca.endpoint.hazelcast.HazelcastRegistry;ranking=150,address=228.0.0.100,port=50000,timeout=50,scheme=tuscany
+org.apache.tuscany.sca.endpoint.hazelcast.HazelcastEndpointRegistry;ranking=150,scheme=tuscany
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/ConfigURITestCase.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/ConfigURITestCase.java
new file mode 100644
index 0000000000..85821afe39
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/ConfigURITestCase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.hazelcast;
+
+import java.net.UnknownHostException;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+public class ConfigURITestCase {
+
+ @Test
+ public void testInvalidPrefix() throws UnknownHostException {
+ try {
+ new ConfigURI("foo");
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testDomainName() throws UnknownHostException {
+ ConfigURI configURI = new ConfigURI("tuscany:myDomain");
+ Assert.assertEquals("myDomain", configURI.getDomainName());
+ Assert.assertFalse(configURI.isMulticastDisabled());
+ }
+
+ @Test
+ public void testListenAddr() throws UnknownHostException {
+ ConfigURI configURI = new ConfigURI("tuscany:myDomain?listen=4321");
+ Assert.assertEquals("myDomain", configURI.getDomainName());
+ Assert.assertFalse(configURI.isMulticastDisabled());
+ Assert.assertEquals(4321, configURI.getListenPort());
+ Assert.assertNull(configURI.getBindAddress());
+ }
+ @Test
+ public void testListenAddr2() throws UnknownHostException {
+ ConfigURI configURI = new ConfigURI("tuscany:myDomain?listen=1.1.1.1:4321");
+ Assert.assertEquals("myDomain", configURI.getDomainName());
+ Assert.assertFalse(configURI.isMulticastDisabled());
+ Assert.assertEquals(4321, configURI.getListenPort());
+ Assert.assertEquals("1.1.1.1", configURI.getBindAddress());
+ }
+
+ @Test
+ public void testMulticase1() throws UnknownHostException {
+ ConfigURI configURI = new ConfigURI("tuscany:myDomain?multicast=off");
+ Assert.assertEquals("myDomain", configURI.getDomainName());
+ Assert.assertTrue(configURI.isMulticastDisabled());
+ }
+
+ @Test
+ public void testMulticase2() throws UnknownHostException {
+ ConfigURI configURI = new ConfigURI("tuscany:myDomain?multicast=1.2.3.4:67");
+ Assert.assertEquals("myDomain", configURI.getDomainName());
+ Assert.assertFalse(configURI.isMulticastDisabled());
+ Assert.assertEquals("1.2.3.4", configURI.getMulticastAddress());
+ Assert.assertEquals(67, configURI.getMulticastPort());
+ }
+
+ @Test
+ public void testMulticase3() throws UnknownHostException {
+ ConfigURI configURI = new ConfigURI("tuscany:myDomain?multicast=1.2.3.4");
+ Assert.assertEquals("myDomain", configURI.getDomainName());
+ Assert.assertFalse(configURI.isMulticastDisabled());
+ Assert.assertEquals("1.2.3.4", configURI.getMulticastAddress());
+ Assert.assertEquals(51482, configURI.getMulticastPort());
+ }
+
+ @Test
+ public void testPassword() {
+ ConfigURI configURI = new ConfigURI("tuscany:myDomain?password=bla");
+ Assert.assertEquals("myDomain", configURI.getDomainName());
+ Assert.assertEquals("bla", configURI.getPassword());
+ }
+
+ @Test
+ public void testRemotes() throws UnknownHostException {
+ ConfigURI configURI = new ConfigURI("tuscany:myDomain?remotes=1.1.1.1:23,2.2.2.2");
+ Assert.assertEquals("myDomain", configURI.getDomainName());
+ Assert.assertTrue(configURI.isMulticastDisabled());
+ Assert.assertEquals(2, configURI.getRemotes().size());
+ Assert.assertEquals("1.1.1.1:23", configURI.getRemotes().get(0));
+ Assert.assertEquals("2.2.2.2:14820", configURI.getRemotes().get(1));
+ }
+
+}
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryTestCase.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryTestCase.java
index 01052fa0f5..c4021f3d4f 100644
--- a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryTestCase.java
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryTestCase.java
@@ -25,6 +25,7 @@ import java.util.List;
import junit.framework.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import com.hazelcast.config.Config;
@@ -35,6 +36,7 @@ import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.nio.Address;
+@Ignore
public class RegistryTestCase {
@Test