From 40aa7960a748c80a2465d5ca78e8f91d43296c8e Mon Sep 17 00:00:00 2001 From: rfeng Date: Fri, 29 Jan 2010 03:34:21 +0000 Subject: Refactor the DomainRegistryFactory to be extensions of DomainRegistryFactoryExtensionPoint git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@904367 13f79535-47bb-0310-9956-ffa450edef68 --- .../tribes/ReplicatedEndpointRegistry.java | 178 +++------------------ .../tribes/TribesDomainRegistryFactory.java | 49 ++++++ 2 files changed, 67 insertions(+), 160 deletions(-) create mode 100644 sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java (limited to 'sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org') diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java index fc16db7a74..3c2a062de2 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java @@ -31,8 +31,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.StringTokenizer; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.logging.Level; import java.util.logging.Logger; @@ -46,23 +44,23 @@ import org.apache.catalina.tribes.membership.McastService; import org.apache.catalina.tribes.membership.StaticMember; import org.apache.catalina.tribes.transport.ReceiverBase; import org.apache.tuscany.sca.assembly.Endpoint; -import org.apache.tuscany.sca.assembly.EndpointReference; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.LifeCycleListener; import org.apache.tuscany.sca.endpoint.tribes.AbstractReplicatedMap.MapEntry; import org.apache.tuscany.sca.endpoint.tribes.MapStore.MapListener; -import org.apache.tuscany.sca.runtime.EndpointListener; +import org.apache.tuscany.sca.runtime.BaseEndpointRegistry; import org.apache.tuscany.sca.runtime.EndpointRegistry; import org.apache.tuscany.sca.runtime.RuntimeEndpoint; /** * A replicated EndpointRegistry based on Apache Tomcat Tribes */ -public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleListener, MapListener { +public class ReplicatedEndpointRegistry extends BaseEndpointRegistry implements EndpointRegistry, LifeCycleListener, + MapListener { private final static Logger logger = Logger.getLogger(ReplicatedEndpointRegistry.class.getName()); private static final String MULTICAST_ADDRESS = "228.0.0.100"; private static final int MULTICAST_PORT = 50000; - + private static final int FIND_REPEAT_COUNT = 10; private int port = MULTICAST_PORT; @@ -73,12 +71,6 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi private int receiverAutoBind = 100; private List staticRoutes; - private final static String DEFAULT_DOMAIN_URI = "http://tuscany.apache.org/sca/1.1/domains/default"; - private String domainURI = DEFAULT_DOMAIN_URI; - private List endpointreferences = new CopyOnWriteArrayList(); - private List listeners = new CopyOnWriteArrayList(); - - private ExtensionPointRegistry registry; private ReplicatedMap map; private String id; @@ -101,7 +93,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi } else { mcastService.setBind(getBindAddress()); } - + return channel; } @@ -109,9 +101,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi Map attributes, String domainRegistryURI, String domainURI) { - this.registry = registry; - this.domainURI = domainURI; - this.id = "[" + System.identityHashCode(this) + "]"; + super(registry, attributes, domainRegistryURI, domainURI); getParameters(attributes, domainRegistryURI); } @@ -220,7 +210,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi ((AbstractSender)sender).setMaxRetryAttempts(5); } */ - + if (staticRoutes != null) { StaticMembershipInterceptor smi = new StaticMembershipInterceptor(); for (URI staticRoute : staticRoutes) { @@ -237,7 +227,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi smi.setLocalMember(map.getChannel().getLocalMember(false)); map.getChannel().addInterceptor(smi); } - + try { map.getChannel().start(Channel.DEFAULT); } catch (ChannelException e) { @@ -265,78 +255,14 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi logger.info("Add endpoint - " + endpoint); } - public void addEndpointReference(EndpointReference endpointReference) { - endpointreferences.add(endpointReference); - logger.fine("Add endpoint reference - " + endpointReference); - } - - public void addListener(EndpointListener listener) { - listeners.add(listener); - } - - /** - * Parse the component/service/binding URI into an array of parts (componentURI, serviceName, bindingName) - * @param uri - * @return - */ - private String[] parse(String uri) { - String[] names = new String[3]; - int index = uri.lastIndexOf('#'); - if (index == -1) { - names[0] = uri; - } else { - names[0] = uri.substring(0, index); - String str = uri.substring(index + 1); - if (str.startsWith("service-binding(") && str.endsWith(")")) { - str = str.substring("service-binding(".length(), str.length() - 1); - String[] parts = str.split("/"); - if (parts.length != 2) { - throw new IllegalArgumentException("Invalid service-binding URI: " + uri); - } - names[1] = parts[0]; - names[2] = parts[1]; - } else if (str.startsWith("service(") && str.endsWith(")")) { - str = str.substring("service(".length(), str.length() - 1); - names[1] = str; - } else { - throw new IllegalArgumentException("Invalid component/service/binding URI: " + uri); - } - } - return names; - } - - private boolean matches(String target, String uri) { - String[] parts1 = parse(target); - String[] parts2 = parse(uri); - for (int i = 0; i < parts1.length; i++) { - if (parts1[i] == null || parts1[i].equals(parts2[i])) { - continue; - } else { - return false; - } - } - return true; - } - - public List findEndpoint(EndpointReference endpointReference) { - logger.fine("Find endpoint for reference - " + endpointReference); - - if (endpointReference.getReference() != null) { - Endpoint targetEndpoint = endpointReference.getTargetEndpoint(); - return findEndpoint(targetEndpoint.getURI()); - } - - return new ArrayList(); - } - public List findEndpoint(String uri) { List foundEndpoints = new ArrayList(); // in the failure case we repeat the look up after a short // delay to take account of tribes replication delays int repeat = FIND_REPEAT_COUNT; - - while (repeat > 0){ + + while (repeat > 0) { for (Object v : map.values()) { Endpoint endpoint = (Endpoint)v; // TODO: implement more complete matching @@ -347,15 +273,15 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi endpoint.setRemote(true); } // if (!entry.isPrimary()) { - ((RuntimeEndpoint) endpoint).bind(registry, this); + ((RuntimeEndpoint)endpoint).bind(registry, this); // } foundEndpoints.add(endpoint); logger.fine("Found endpoint with matching service - " + endpoint); repeat = 0; - } + } // else the service name doesn't match } - + if (foundEndpoints.size() == 0) { // the service name doesn't match any endpoints so wait a little and try // again in case this is caused by tribes synch delays @@ -363,9 +289,9 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi repeat--; try { Thread.sleep(1000); - } catch(Exception ex){ + } catch (Exception ex) { // do nothing - repeat=0; + repeat = 0; } } } @@ -373,45 +299,23 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi return foundEndpoints; } - private boolean isLocal(MapEntry entry) { return entry.getPrimary().equals(map.getChannel().getLocalMember(false)); } - public List findEndpointReference(Endpoint endpoint) { - return endpointreferences; - } - public Endpoint getEndpoint(String uri) { return (Endpoint)map.get(uri); } - public List getEndpointReferences() { - return endpointreferences; - } - public List getEndpoints() { return new ArrayList(map.values()); } - public List getListeners() { - return listeners; - } - public void removeEndpoint(Endpoint endpoint) { map.remove(endpoint.getURI()); logger.info("Remove endpoint - " + endpoint); } - public void removeEndpointReference(EndpointReference endpointReference) { - endpointreferences.remove(endpointReference); - logger.fine("Remove endpoint reference - " + endpointReference); - } - - public void removeListener(EndpointListener listener) { - listeners.remove(listener); - } - public void replicate(boolean complete) { map.replicate(complete); } @@ -431,10 +335,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi logger.info(id + " Remote endpoint added: " + entry.getValue()); newEp.setRemote(true); } - ((RuntimeEndpoint) newEp).bind(registry, this); - for (EndpointListener listener : listeners) { - listener.endpointAdded(newEp); - } + endpointAdded(newEp); } public void entryRemoved(Object key, Object value) { @@ -442,11 +343,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi if (!isLocal(entry)) { logger.info(id + " Remote endpoint removed: " + entry.getValue()); } - Endpoint oldEp = (Endpoint)entry.getValue(); - ((RuntimeEndpoint) oldEp).bind(registry, this); - for (EndpointListener listener : listeners) { - listener.endpointRemoved(oldEp); - } + endpointRemoved((Endpoint)entry.getValue()); } public void entryUpdated(Object key, Object oldValue, Object newValue) { @@ -457,46 +354,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi } Endpoint oldEp = (Endpoint)oldEntry.getValue(); Endpoint newEp = (Endpoint)newEntry.getValue(); - ((RuntimeEndpoint) newEp).bind(registry, this); - for (EndpointListener listener : listeners) { - listener.endpointUpdated(oldEp, newEp); - } - } - - public static void main(String[] args) throws Exception { - //create a channel - GroupChannel channel = new GroupChannel(); - McastService mcastService = (McastService)channel.getMembershipService(); - mcastService.setPort(MULTICAST_PORT); - mcastService.setAddress(MULTICAST_ADDRESS); - - -// ChannelReceiver rcv = channel.getChannelReceiver(); -// ReceiverBase rcvb = (ReceiverBase)rcv; -// rcvb.setPort(10480); - - InetAddress localhost = InetAddress.getLocalHost(); - - // REVIEW: In my case, there are multiple IP addresses - // One for the WIFI and the other one for VPN. For some reason the VPN one doesn't support - // Multicast - - // You can use "route add 228.0.0.0 mask 252.0.0.0 192.168.1.100" - mcastService.setBind(getBindAddress()); - channel.start(Channel.DEFAULT); - ReplicatedMap map = new ReplicatedMap(null, channel, 50, "01", null); - map.put(UUID.randomUUID().toString(), localhost.getHostAddress()); - for (int i = 0; i < 4; i++) { - Thread.sleep(3000); - System.out.println(localhost + ": " + map.keySet()); - } - for (Object e : map.entrySetFull()) { - Map.Entry en = (Map.Entry)e; - AbstractReplicatedMap.MapEntry entry = (AbstractReplicatedMap.MapEntry)en.getValue(); - System.out.println(entry); - } - map.breakdown(); - channel.stop(Channel.DEFAULT); + endpointUpdated(oldEp, newEp); } private static String getBindAddress() { diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java new file mode 100644 index 0000000000..68add08276 --- /dev/null +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.tribes; + +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.runtime.BaseDomainRegistryFactory; +import org.apache.tuscany.sca.runtime.EndpointRegistry; + +/** + * The utility responsible for finding the endpoint regstry by the scheme and creating instances for the + * given domain + */ +public class TribesDomainRegistryFactory extends BaseDomainRegistryFactory { + private final static String[] schemes = new String[] {"multicast", "tribes"}; + + /** + * @param extensionRegistry + */ + public TribesDomainRegistryFactory(ExtensionPointRegistry registry) { + super(registry); + } + + protected EndpointRegistry createEndpointRegistry(String endpointRegistryURI, String domainURI) { + EndpointRegistry endpointRegistry = + new ReplicatedEndpointRegistry(registry, null, endpointRegistryURI, domainURI); + return endpointRegistry; + } + + public String[] getSupportedSchemes() { + return schemes; + } +} -- cgit v1.2.3