summaryrefslogtreecommitdiffstats
path: root/java/sca/modules/endpoint-dht
diff options
context:
space:
mode:
Diffstat (limited to 'java/sca/modules/endpoint-dht')
-rw-r--r--java/sca/modules/endpoint-dht/pom.xml10
-rw-r--r--java/sca/modules/endpoint-dht/src/main/java/org/apache/tuscany/sca/endpoint/dht/OverlayEndpointRegistry.java199
2 files changed, 116 insertions, 93 deletions
diff --git a/java/sca/modules/endpoint-dht/pom.xml b/java/sca/modules/endpoint-dht/pom.xml
index 8c62c5dde6..90aa3d6799 100644
--- a/java/sca/modules/endpoint-dht/pom.xml
+++ b/java/sca/modules/endpoint-dht/pom.xml
@@ -29,13 +29,13 @@
<name>Apache Tuscany SCA OverlayWeaver DHT Based EndPoint Registry</name>
<dependencies>
- <!-- <dependency>
- <groupId>org.apache.tomcat</groupId>
- <artifactId>overlay</artifactId>
- <version>6.0.18</version>
+ <dependency>
+ <groupId>ow</groupId>
+ <artifactId>overlayweaver</artifactId>
+ <version>0.9.5</version>
<scope>compile</scope>
</dependency>
- -->
+
<dependency>
<groupId>org.apache.tuscany.sca</groupId>
<artifactId>tuscany-core-spi</artifactId>
diff --git a/java/sca/modules/endpoint-dht/src/main/java/org/apache/tuscany/sca/endpoint/dht/OverlayEndpointRegistry.java b/java/sca/modules/endpoint-dht/src/main/java/org/apache/tuscany/sca/endpoint/dht/OverlayEndpointRegistry.java
index 6cf955bd70..04b260de0f 100644
--- a/java/sca/modules/endpoint-dht/src/main/java/org/apache/tuscany/sca/endpoint/dht/OverlayEndpointRegistry.java
+++ b/java/sca/modules/endpoint-dht/src/main/java/org/apache/tuscany/sca/endpoint/dht/OverlayEndpointRegistry.java
@@ -28,13 +28,16 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import ow.id.ID;
import ow.routing.RoutingException;
import ow.dht.DHT;
import ow.dht.DHTConfiguration;
-
+import ow.dht.DHTFactory;
+import ow.dht.ValueInfo;
+import java.io.IOException;
import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
@@ -58,7 +61,7 @@ public class OverlayEndpointRegistry implements EndpointRegistry, LifeCycleListe
private ExtensionPointRegistry registry;
private String joinPort= null;
private String address = null;
-
+ private int port;
public OverlayEndpointRegistry(ExtensionPointRegistry registry, Map<String, String> attributes) {
this.registry = registry;
@@ -67,7 +70,7 @@ public class OverlayEndpointRegistry implements EndpointRegistry, LifeCycleListe
port = DEFAULT_PORT;
}
/* This is the address that you need for join a DHT */
- String address = attributes.get("address");
+ address = attributes.get("address");
if (address == null) {
address = getBindAddress();
}
@@ -83,23 +86,23 @@ public class OverlayEndpointRegistry implements EndpointRegistry, LifeCycleListe
throw new IllegalStateException("The registry has already been started");
}
/* here you have to join the DHT */
- DHTConfiguration config = DHTFactory.getDefaultConfiguration();
- /* the DHT behaviour should be configurable */
- config.setRoutingStyle("Iterative");
- config.setRoutingAlgorithm("Pastry");
- config.setSelfPort(DEFAULT_PORT);
- try {
- map = DHTFactory.<Endpoint>getDHT(config);
- }
- catch (Exception e) {
- throw new IllegalStateException(e);
- }
- try {
- map.joinOverlay(address, joinPort);
- }
- catch (IOException e) {
- throw new IllegalStateException(e);
- }
+ DHTConfiguration config = DHTFactory.getDefaultConfiguration();
+ /* the DHT behaviour should be configurable */
+ config.setRoutingStyle("Iterative");
+ config.setRoutingAlgorithm("Pastry");
+ config.setSelfPort(DEFAULT_PORT);
+ try {
+ map = DHTFactory.<Endpoint>getDHT(config);
+ }
+ catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ try {
+ map.joinOverlay(address, Integer.parseInt(joinPort));
+ }
+ catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
}
@@ -112,9 +115,15 @@ public class OverlayEndpointRegistry implements EndpointRegistry, LifeCycleListe
public void addEndpoint(Endpoint endpoint) {
int idSize = map.getRoutingAlgorithmConfiguration().getIDSizeInByte();
- ID key = ID.getHashcodeBasedID(endpoint.getURI(), idSize);
- map.put(key, endpoint);
- publishedEndpoints.put(endpoint.getURI, endpoint);
+ ID key = ID.getHashcodeBasedID(endpoint.getURI(), idSize);
+ try {
+ map.put(key, endpoint);
+
+ }
+ catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ publishedEndpoints.put(endpoint.getURI(), endpoint);
logger.info("Add endpoint - " + endpoint);
}
@@ -179,126 +188,140 @@ public class OverlayEndpointRegistry implements EndpointRegistry, LifeCycleListe
if (endpointReference.getReference() != null) {
Endpoint targetEndpoint = endpointReference.getTargetEndpoint();
int idSize = map.getRoutingAlgorithmConfiguration().getIDSizeInByte();
- ID key = ID.getHashcodeBasedID(targetEndpoint.getURI(), idSize);
- logger.info("Matching against - " + endpoint);
- Set<ValueInfo<Endpoint>> values = map.get(key);
- for (ValueInfo<Endpoint>v : values)
- {
- Endpoint endpoint = v.getValue();
- if (matches(targetEndpoint.getURI(), endpoint.getURI())) {
+ ID key = ID.getHashcodeBasedID(targetEndpoint.getURI(), idSize);
+ try {
+ Set<ValueInfo<Endpoint>> values = map.get(key);
+ for (ValueInfo<Endpoint>v : values)
+ {
+ Endpoint endpoint = v.getValue();
+ if (matches(targetEndpoint.getURI(), endpoint.getURI())) {
- if (!isLocal(v)) {
- endpoint.setRemote(true);
- }
+ if (!isLocal(endpoint)) {
+ endpoint.setRemote(true);
+ }
- endpoint.setExtensionPointRegistry(registry);
+ endpoint.setExtensionPointRegistry(registry);
- foundEndpoints.add(endpoint);
- logger.info("Found endpoint with matching service - " + endpoint);
- }
+ foundEndpoints.add(endpoint);
+ logger.info("Found endpoint with matching service - " + endpoint);
+ }
- }
+ }
- }
- return foundEndpoints;
+ } catch (Exception e)
+ {
+ throw new IllegalStateException("Routing exception during resolving endpoint");
+ }
+ }
+
+ return foundEndpoints;
}
private boolean isLocal(Endpoint entry) {
Endpoint local;
- local = publishedEndpoint.get(entry.getURI());
+ local = publishedEndpoints.get(entry.getURI());
if (local != null)
return true;
return false;
}
public List<EndpointReference> findEndpointReference(Endpoint endpoint) {
- return endpointreferences;
+ return endpointreferences;
}
public Endpoint getEndpoint(String uri) {
/* if is local there no need to go on the net*/
- Endpoint local;
- local = publishedEndpoint.get(uri);
+ Endpoint local = null;
+ local = publishedEndpoints.get(uri);
if (local != null)
return local;
/* otherwise we should check on the net */
int idSize = map.getRoutingAlgorithmConfiguration().getIDSizeInByte();
- ID key = ID.getHashcodeBasedID(uri, idSize);
+ ID key = ID.getHashcodeBasedID(uri, idSize);
+ try {
Set<ValueInfo<Endpoint>> values = map.get(key);
- return (Endpoint)map.get(uri);
+ for(ValueInfo <Endpoint> v: values)
+ {
+ return v.getValue();
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Routing exception during resolving endpoint");
+ }
+ return local;
}
public List<EndpointReference> getEndpointRefereneces() {
- return endpointreferences;
+ return endpointreferences;
}
public List<Endpoint> getEndpoints() {
- /*TODO*/
- }
+ /*TODO*/
+ return null;
+ }
public List<EndpointListener> getListeners() {
- return listeners;
+ return listeners;
}
public void removeEndpoint(Endpoint endpoint) {
- /*TODO*/
- publishedEndpoint.remove(endpoint.getURI());
- logger.info("Remove endpoint - " + endpoint);
+ /*TODO*/
+ publishedEndpoints.remove(endpoint.getURI());
+ logger.info("Remove endpoint - " + endpoint);
}
public void removeEndpointReference(EndpointReference endpointReference) {
- endpointreferences.remove(endpointReference);
- logger.info("Remove endpoint reference - " + endpointReference);
+ endpointreferences.remove(endpointReference);
+ logger.info("Remove endpoint reference - " + endpointReference);
}
public void removeListener(EndpointListener listener) {
- listeners.remove(listener);
+ listeners.remove(listener);
}
public void updateEndpoint(String uri, Endpoint endpoint) {
- /* TODO*/
+ /* TODO*/
}
public void entryAdded(Object key, Object value) {
- /* TODO*/
+ /* TODO*/
}
public void entryRemoved(Object key, Object value) {
- /* TODO*/
- }
+ /* TODO*/
+ }
public void entryUpdated(Object key, Object oldValue, Object newValue) {
- /* TODO*/
+ /* TODO*/
}
private static String getBindAddress() {
- try {
- Enumeration<NetworkInterface> nis = NetworkInterface.getNetworkInterfaces();
- while (nis.hasMoreElements()) {
- NetworkInterface ni = nis.nextElement();
- // The following APIs require JDK 1.6
- /*
- if (ni.isLoopback() || !ni.isUp() || !ni.supportsMulticast()) {
- continue;
- }
- */
- Enumeration<InetAddress> ips = ni.getInetAddresses();
- if (!ips.hasMoreElements()) {
- continue;
- }
- while (ips.hasMoreElements()) {
- InetAddress addr = ips.nextElement();
- if (addr.isLoopbackAddress()) {
- continue;
- }
- return addr.getHostAddress();
- }
- }
- return InetAddress.getLocalHost().getHostAddress();
- } catch (Exception e) {
- logger.log(Level.SEVERE, e.getMessage(), e);
- return null;
- }
+ try {
+ Enumeration<NetworkInterface> nis = NetworkInterface.getNetworkInterfaces();
+ while (nis.hasMoreElements()) {
+ NetworkInterface ni = nis.nextElement();
+ // The following APIs require JDK 1.6
+ /*
+ if (ni.isLoopback() || !ni.isUp() || !ni.supportsMulticast()) {
+ continue;
+ }
+ */
+ Enumeration<InetAddress> ips = ni.getInetAddresses();
+ if (!ips.hasMoreElements()) {
+ continue;
+ }
+ while (ips.hasMoreElements()) {
+ InetAddress addr = ips.nextElement();
+ if (addr.isLoopbackAddress()) {
+ continue;
+ }
+ return addr.getHostAddress();
+ }
+ }
+ return InetAddress.getLocalHost().getHostAddress();
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, e.getMessage(), e);
+ return null;
+ }
}
}