summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
diff options
context:
space:
mode:
Diffstat (limited to 'sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java')
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java178
1 files changed, 18 insertions, 160 deletions
diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
index fc16db7a74..3c2a062de2 100644
--- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
+++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
@@ -31,8 +31,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
-import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -46,23 +44,23 @@ import org.apache.catalina.tribes.membership.McastService;
import org.apache.catalina.tribes.membership.StaticMember;
import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.tuscany.sca.assembly.Endpoint;
-import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.LifeCycleListener;
import org.apache.tuscany.sca.endpoint.tribes.AbstractReplicatedMap.MapEntry;
import org.apache.tuscany.sca.endpoint.tribes.MapStore.MapListener;
-import org.apache.tuscany.sca.runtime.EndpointListener;
+import org.apache.tuscany.sca.runtime.BaseEndpointRegistry;
import org.apache.tuscany.sca.runtime.EndpointRegistry;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
/**
* A replicated EndpointRegistry based on Apache Tomcat Tribes
*/
-public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleListener, MapListener {
+public class ReplicatedEndpointRegistry extends BaseEndpointRegistry implements EndpointRegistry, LifeCycleListener,
+ MapListener {
private final static Logger logger = Logger.getLogger(ReplicatedEndpointRegistry.class.getName());
private static final String MULTICAST_ADDRESS = "228.0.0.100";
private static final int MULTICAST_PORT = 50000;
-
+
private static final int FIND_REPEAT_COUNT = 10;
private int port = MULTICAST_PORT;
@@ -73,12 +71,6 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
private int receiverAutoBind = 100;
private List<URI> staticRoutes;
- private final static String DEFAULT_DOMAIN_URI = "http://tuscany.apache.org/sca/1.1/domains/default";
- private String domainURI = DEFAULT_DOMAIN_URI;
- private List<EndpointReference> endpointreferences = new CopyOnWriteArrayList<EndpointReference>();
- private List<EndpointListener> listeners = new CopyOnWriteArrayList<EndpointListener>();
-
- private ExtensionPointRegistry registry;
private ReplicatedMap map;
private String id;
@@ -101,7 +93,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
} else {
mcastService.setBind(getBindAddress());
}
-
+
return channel;
}
@@ -109,9 +101,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
Map<String, String> attributes,
String domainRegistryURI,
String domainURI) {
- this.registry = registry;
- this.domainURI = domainURI;
- this.id = "[" + System.identityHashCode(this) + "]";
+ super(registry, attributes, domainRegistryURI, domainURI);
getParameters(attributes, domainRegistryURI);
}
@@ -220,7 +210,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
((AbstractSender)sender).setMaxRetryAttempts(5);
}
*/
-
+
if (staticRoutes != null) {
StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
for (URI staticRoute : staticRoutes) {
@@ -237,7 +227,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
smi.setLocalMember(map.getChannel().getLocalMember(false));
map.getChannel().addInterceptor(smi);
}
-
+
try {
map.getChannel().start(Channel.DEFAULT);
} catch (ChannelException e) {
@@ -265,78 +255,14 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
logger.info("Add endpoint - " + endpoint);
}
- public void addEndpointReference(EndpointReference endpointReference) {
- endpointreferences.add(endpointReference);
- logger.fine("Add endpoint reference - " + endpointReference);
- }
-
- public void addListener(EndpointListener listener) {
- listeners.add(listener);
- }
-
- /**
- * Parse the component/service/binding URI into an array of parts (componentURI, serviceName, bindingName)
- * @param uri
- * @return
- */
- private String[] parse(String uri) {
- String[] names = new String[3];
- int index = uri.lastIndexOf('#');
- if (index == -1) {
- names[0] = uri;
- } else {
- names[0] = uri.substring(0, index);
- String str = uri.substring(index + 1);
- if (str.startsWith("service-binding(") && str.endsWith(")")) {
- str = str.substring("service-binding(".length(), str.length() - 1);
- String[] parts = str.split("/");
- if (parts.length != 2) {
- throw new IllegalArgumentException("Invalid service-binding URI: " + uri);
- }
- names[1] = parts[0];
- names[2] = parts[1];
- } else if (str.startsWith("service(") && str.endsWith(")")) {
- str = str.substring("service(".length(), str.length() - 1);
- names[1] = str;
- } else {
- throw new IllegalArgumentException("Invalid component/service/binding URI: " + uri);
- }
- }
- return names;
- }
-
- private boolean matches(String target, String uri) {
- String[] parts1 = parse(target);
- String[] parts2 = parse(uri);
- for (int i = 0; i < parts1.length; i++) {
- if (parts1[i] == null || parts1[i].equals(parts2[i])) {
- continue;
- } else {
- return false;
- }
- }
- return true;
- }
-
- public List<Endpoint> findEndpoint(EndpointReference endpointReference) {
- logger.fine("Find endpoint for reference - " + endpointReference);
-
- if (endpointReference.getReference() != null) {
- Endpoint targetEndpoint = endpointReference.getTargetEndpoint();
- return findEndpoint(targetEndpoint.getURI());
- }
-
- return new ArrayList<Endpoint>();
- }
-
public List<Endpoint> findEndpoint(String uri) {
List<Endpoint> foundEndpoints = new ArrayList<Endpoint>();
// in the failure case we repeat the look up after a short
// delay to take account of tribes replication delays
int repeat = FIND_REPEAT_COUNT;
-
- while (repeat > 0){
+
+ while (repeat > 0) {
for (Object v : map.values()) {
Endpoint endpoint = (Endpoint)v;
// TODO: implement more complete matching
@@ -347,15 +273,15 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
endpoint.setRemote(true);
}
// if (!entry.isPrimary()) {
- ((RuntimeEndpoint) endpoint).bind(registry, this);
+ ((RuntimeEndpoint)endpoint).bind(registry, this);
// }
foundEndpoints.add(endpoint);
logger.fine("Found endpoint with matching service - " + endpoint);
repeat = 0;
- }
+ }
// else the service name doesn't match
}
-
+
if (foundEndpoints.size() == 0) {
// the service name doesn't match any endpoints so wait a little and try
// again in case this is caused by tribes synch delays
@@ -363,9 +289,9 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
repeat--;
try {
Thread.sleep(1000);
- } catch(Exception ex){
+ } catch (Exception ex) {
// do nothing
- repeat=0;
+ repeat = 0;
}
}
}
@@ -373,45 +299,23 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
return foundEndpoints;
}
-
private boolean isLocal(MapEntry entry) {
return entry.getPrimary().equals(map.getChannel().getLocalMember(false));
}
- public List<EndpointReference> findEndpointReference(Endpoint endpoint) {
- return endpointreferences;
- }
-
public Endpoint getEndpoint(String uri) {
return (Endpoint)map.get(uri);
}
- public List<EndpointReference> getEndpointReferences() {
- return endpointreferences;
- }
-
public List<Endpoint> getEndpoints() {
return new ArrayList(map.values());
}
- public List<EndpointListener> getListeners() {
- return listeners;
- }
-
public void removeEndpoint(Endpoint endpoint) {
map.remove(endpoint.getURI());
logger.info("Remove endpoint - " + endpoint);
}
- public void removeEndpointReference(EndpointReference endpointReference) {
- endpointreferences.remove(endpointReference);
- logger.fine("Remove endpoint reference - " + endpointReference);
- }
-
- public void removeListener(EndpointListener listener) {
- listeners.remove(listener);
- }
-
public void replicate(boolean complete) {
map.replicate(complete);
}
@@ -431,10 +335,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
logger.info(id + " Remote endpoint added: " + entry.getValue());
newEp.setRemote(true);
}
- ((RuntimeEndpoint) newEp).bind(registry, this);
- for (EndpointListener listener : listeners) {
- listener.endpointAdded(newEp);
- }
+ endpointAdded(newEp);
}
public void entryRemoved(Object key, Object value) {
@@ -442,11 +343,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
if (!isLocal(entry)) {
logger.info(id + " Remote endpoint removed: " + entry.getValue());
}
- Endpoint oldEp = (Endpoint)entry.getValue();
- ((RuntimeEndpoint) oldEp).bind(registry, this);
- for (EndpointListener listener : listeners) {
- listener.endpointRemoved(oldEp);
- }
+ endpointRemoved((Endpoint)entry.getValue());
}
public void entryUpdated(Object key, Object oldValue, Object newValue) {
@@ -457,46 +354,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
}
Endpoint oldEp = (Endpoint)oldEntry.getValue();
Endpoint newEp = (Endpoint)newEntry.getValue();
- ((RuntimeEndpoint) newEp).bind(registry, this);
- for (EndpointListener listener : listeners) {
- listener.endpointUpdated(oldEp, newEp);
- }
- }
-
- public static void main(String[] args) throws Exception {
- //create a channel
- GroupChannel channel = new GroupChannel();
- McastService mcastService = (McastService)channel.getMembershipService();
- mcastService.setPort(MULTICAST_PORT);
- mcastService.setAddress(MULTICAST_ADDRESS);
-
-
-// ChannelReceiver rcv = channel.getChannelReceiver();
-// ReceiverBase rcvb = (ReceiverBase)rcv;
-// rcvb.setPort(10480);
-
- InetAddress localhost = InetAddress.getLocalHost();
-
- // REVIEW: In my case, there are multiple IP addresses
- // One for the WIFI and the other one for VPN. For some reason the VPN one doesn't support
- // Multicast
-
- // You can use "route add 228.0.0.0 mask 252.0.0.0 192.168.1.100"
- mcastService.setBind(getBindAddress());
- channel.start(Channel.DEFAULT);
- ReplicatedMap map = new ReplicatedMap(null, channel, 50, "01", null);
- map.put(UUID.randomUUID().toString(), localhost.getHostAddress());
- for (int i = 0; i < 4; i++) {
- Thread.sleep(3000);
- System.out.println(localhost + ": " + map.keySet());
- }
- for (Object e : map.entrySetFull()) {
- Map.Entry en = (Map.Entry)e;
- AbstractReplicatedMap.MapEntry entry = (AbstractReplicatedMap.MapEntry)en.getValue();
- System.out.println(entry);
- }
- map.breakdown();
- channel.stop(Channel.DEFAULT);
+ endpointUpdated(oldEp, newEp);
}
private static String getBindAddress() {