summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java42
1 files changed, 42 insertions, 0 deletions
diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
index c902323d00..823ecd51d0 100644
--- a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
+++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
@@ -19,12 +19,16 @@
package org.apache.tuscany.sca.endpoint.tribes;
+import java.io.IOException;
import java.net.InetAddress;
+import java.net.MalformedURLException;
import java.net.NetworkInterface;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
+import java.util.StringTokenizer;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Level;
@@ -32,8 +36,14 @@ import java.util.logging.Logger;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelInterceptor;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelCoordinator;
import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
import org.apache.catalina.tribes.membership.McastService;
+import org.apache.catalina.tribes.membership.StaticMember;
import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
@@ -65,6 +75,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
private ExtensionPointRegistry registry;
private ReplicatedMap map;
+ private static List<URL> staticRoutes;
private static final Channel createChannel(String address, int port, String bindAddress) {
@@ -117,6 +128,19 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
if (timeoutStr != null) {
timeout = Integer.parseInt(timeoutStr);
}
+
+ String routesStr = attributes.get("routes");
+ if (routesStr != null) {
+ StringTokenizer st = new StringTokenizer(routesStr);
+ staticRoutes = new ArrayList<URL>();
+ while (st.hasMoreElements()) {
+ try {
+ staticRoutes.add(new URL("http://" + st.nextToken()));
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
public ReplicatedEndpointRegistry(String domainURI) {
@@ -137,6 +161,24 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
} catch (ChannelException e) {
throw new IllegalStateException(e);
}
+
+ if (staticRoutes != null) {
+ StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
+ for (URL staticRoute : staticRoutes) {
+ Member member;
+ try {
+ member = new StaticMember(staticRoute.getHost(), staticRoute.getPort(), 5000);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ smi.addStaticMember(member);
+ logger.info("Added static route: " + staticRoute.getHost() + ":" + staticRoute.getPort());
+ }
+ smi.setLocalMember(map.getChannel().getLocalMember(false));
+ map.getChannel().addInterceptor(smi);
+ }
+
+
}
public void stop() {