From 132aa8a77685ec92bc90c03f987650d275a7b639 Mon Sep 17 00:00:00 2001 From: lresende Date: Mon, 30 Sep 2013 06:59:11 +0000 Subject: 2.0.1 RC1 release tag git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1527464 13f79535-47bb-0310-9956-ffa450edef68 --- .../endpoint/tribes/ReplicatedDomainRegistry.java | 509 +++++++++++++++++++++ 1 file changed, 509 insertions(+) create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedDomainRegistry.java (limited to 'sca-java-2.x/tags/2.0.1-RC1/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedDomainRegistry.java') diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedDomainRegistry.java b/sca-java-2.x/tags/2.0.1-RC1/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedDomainRegistry.java new file mode 100644 index 0000000000..e101f916dd --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedDomainRegistry.java @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.tribes; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.URI; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; +import java.util.concurrent.Callable; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.xml.namespace.QName; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelReceiver; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor; +import org.apache.catalina.tribes.membership.McastService; +import org.apache.catalina.tribes.membership.StaticMember; +import org.apache.catalina.tribes.transport.ReceiverBase; +import org.apache.tuscany.sca.assembly.Composite; +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.endpoint.tribes.AbstractReplicatedMap.MapEntry; +import org.apache.tuscany.sca.endpoint.tribes.MapStore.MapListener; +import org.apache.tuscany.sca.runtime.BaseDomainRegistry; +import org.apache.tuscany.sca.runtime.DomainRegistryURI; +import org.apache.tuscany.sca.runtime.DomainRegistry; +import org.apache.tuscany.sca.runtime.ContributionDescription; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; + +/** + * A replicated DomainRegistry based on Apache Tomcat Tribes + */ +public class ReplicatedDomainRegistry extends BaseDomainRegistry implements DomainRegistry, LifeCycleListener, + MapListener { + private final static Logger logger = Logger.getLogger(ReplicatedDomainRegistry.class.getName()); + private static final String MULTICAST_ADDRESS = "228.0.0.100"; + private static final int MULTICAST_PORT = 50000; + + private static final int FIND_REPEAT_COUNT = 10; + + private int port = MULTICAST_PORT; + private String address = MULTICAST_ADDRESS; + private String bind = null; + private int timeout = 50; + private String receiverAddress; + private int receiverPort = 4000; + private int receiverAutoBind = 100; + private List staticRoutes; + + private ReplicatedMap map; + + private String id; + private boolean noMultiCast; + + private static final GroupChannel createChannel(String address, int port, String bindAddress) { + + //create a channel + GroupChannel channel = new GroupChannel(); + McastService mcastService = (McastService)channel.getMembershipService(); + mcastService.setPort(port); + mcastService.setAddress(address); + + // REVIEW: In my case, there are multiple IP addresses + // One for the WIFI and the other one for VPN. For some reason the VPN one doesn't support + // Multicast + + if (bindAddress != null) { + mcastService.setBind(bindAddress); + } else { + mcastService.setBind(getBindAddress()); + } + + return channel; + } + + public ReplicatedDomainRegistry(ExtensionPointRegistry registry, + Map attributes, + String domainRegistryURI, + String domainURI) { + super(registry, attributes, domainRegistryURI, domainURI); + getParameters(attributes, domainRegistryURI); + } + + private Map getParameters(Map attributes, String domainRegistryURI) { + Map map = new HashMap(); + if (attributes != null) { + map.putAll(attributes); + } + URI uri = URI.create(domainRegistryURI); + if (uri.getHost() != null) { + map.put("address", uri.getHost()); + } + if (uri.getPort() != -1) { + map.put("port", String.valueOf(uri.getPort())); + } + + if (domainRegistryURI.startsWith("tuscany")) { + setTuscanyConfig(map, domainRegistryURI); + setConfig(map); + return map; + } + + int index = domainRegistryURI.indexOf('?'); + if (index == -1) { + setConfig(map); + return map; + } + String query = domainRegistryURI.substring(index + 1); + try { + query = URLDecoder.decode(query, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new IllegalArgumentException(e); + } + String[] params = query.split("&"); + for (String param : params) { + index = param.indexOf('='); + if (index != -1) { + map.put(param.substring(0, index), param.substring(index + 1)); + } + } + setConfig(map); + return map; + } + + private void setTuscanyConfig(Map map, String domainRegistryURI) { + DomainRegistryURI tuscanyURI = new DomainRegistryURI(domainRegistryURI); + map.put("address", tuscanyURI.getMulticastAddress()); + map.put("port", Integer.toString(tuscanyURI.getMulticastPort())); + map.put("bind", tuscanyURI.getBindAddress()); + map.put("receiverPort", Integer.toString(tuscanyURI.getListenPort())); + if (tuscanyURI.isMulticastDisabled()) { + map.put("nomcast", "true"); + } + if (tuscanyURI.getRemotes().size() > 0) { + String routes = ""; + for (int i=0; i attributes) { + String portStr = attributes.get("port"); + if (portStr != null) { + port = Integer.parseInt(portStr); + if (port == -1) { + port = MULTICAST_PORT; + } + } + String address = attributes.get("address"); + if (address == null) { + address = MULTICAST_ADDRESS; + } + bind = attributes.get("bind"); + String timeoutStr = attributes.get("timeout"); + if (timeoutStr != null) { + timeout = Integer.parseInt(timeoutStr); + } + + String routesStr = attributes.get("routes"); + if (routesStr != null) { + StringTokenizer st = new StringTokenizer(routesStr); + staticRoutes = new ArrayList(); + while (st.hasMoreElements()) { + staticRoutes.add(URI.create("tcp://" + st.nextToken())); + } + } + String mcast = attributes.get("nomcast"); + if (mcast != null) { + noMultiCast = Boolean.valueOf(mcast); + } + receiverAddress = attributes.get("receiverAddress"); + String recvPort = attributes.get("receiverPort"); + if (recvPort != null) { + receiverPort = Integer.parseInt(recvPort); + } + String recvAutoBind = attributes.get("receiverAutoBind"); + if (recvAutoBind != null) { + receiverAutoBind = Integer.parseInt(recvAutoBind); + } + } + + public void start() { + if (map != null) { + throw new IllegalStateException("The registry has already been started"); + } + GroupChannel channel = createChannel(address, port, bind); + map = + new ReplicatedMap(null, channel, timeout, this.domainURI, + new ClassLoader[] {ReplicatedDomainRegistry.class.getClassLoader()}); + map.addListener(this); + + if (noMultiCast) { + map.getChannel().addInterceptor(new DisableMcastInterceptor()); + } + + // Configure the receiver ports + ChannelReceiver receiver = channel.getChannelReceiver(); + if (receiver instanceof ReceiverBase) { + if (receiverAddress != null) { + ((ReceiverBase)receiver).setAddress(receiverAddress); + } + ((ReceiverBase)receiver).setPort(receiverPort); + ((ReceiverBase)receiver).setAutoBind(receiverAutoBind); + } + + /* + Object sender = channel.getChannelSender(); + if (sender instanceof ReplicationTransmitter) { + sender = ((ReplicationTransmitter)sender).getTransport(); + } + if (sender instanceof AbstractSender) { + ((AbstractSender)sender).setKeepAliveCount(0); + ((AbstractSender)sender).setMaxRetryAttempts(5); + } + */ + + if (staticRoutes != null) { + StaticMembershipInterceptor smi = new StaticMembershipInterceptor(); + for (URI staticRoute : staticRoutes) { + Member member; + try { + // The port has to match the receiver port + member = new StaticMember(staticRoute.getHost(), staticRoute.getPort(), 5000); + } catch (IOException e) { + throw new RuntimeException(e); + } + smi.addStaticMember(member); + logger.info("Added static route: " + staticRoute.getHost() + ":" + staticRoute.getPort()); + } + smi.setLocalMember(map.getChannel().getLocalMember(false)); + map.getChannel().addInterceptor(smi); + } + + try { + map.getChannel().start(Channel.DEFAULT); + } catch (ChannelException e) { + throw new IllegalStateException(e); + } + + } + + public void stop() { + if (map != null) { + map.removeListener(this); + Channel channel = map.getChannel(); + map.breakdown(); + try { + channel.stop(Channel.DEFAULT); + } catch (ChannelException e) { + logger.log(Level.WARNING, e.getMessage(), e); + } + map = null; + } + } + + public void addEndpoint(Endpoint endpoint) { + map.put(endpoint.getURI(), endpoint); + logger.info("Add endpoint - " + endpoint); + } + + public List findEndpoint(String uri) { + List foundEndpoints = new ArrayList(); + + // in the failure case we repeat the look up after a short + // delay to take account of tribes replication delays + int repeat = FIND_REPEAT_COUNT; + + while (repeat > 0) { + for (Object v : map.values()) { + Endpoint endpoint = (Endpoint)v; + // TODO: implement more complete matching + logger.fine("Matching against - " + endpoint); + if (endpoint.matches(uri)) { + MapEntry entry = map.getInternal(endpoint.getURI()); + // if (!entry.isPrimary()) { + ((RuntimeEndpoint)endpoint).bind(registry, this); + // } + foundEndpoints.add(endpoint); + logger.fine("Found endpoint with matching service - " + endpoint); + repeat = 0; + } + // else the service name doesn't match + } + + if (foundEndpoints.size() == 0) { + // the service name doesn't match any endpoints so wait a little and try + // again in case this is caused by tribes synch delays + logger.info("Repeating endpoint reference match - " + uri); + repeat--; + try { + Thread.sleep(1000); + } catch (Exception ex) { + // do nothing + repeat = 0; + } + } + } + + return foundEndpoints; + } + + private boolean isLocal(MapEntry entry) { + return entry.getPrimary().equals(map.getChannel().getLocalMember(false)); + } + + public Endpoint getEndpoint(String uri) { + return (Endpoint)map.get(uri); + } + + public List getEndpoints() { + return new ArrayList(map.values()); + } + + public void removeEndpoint(Endpoint endpoint) { + map.remove(endpoint.getURI()); + logger.info("Remove endpoint - " + endpoint); + } + + public void replicate(boolean complete) { + map.replicate(complete); + } + + public void updateEndpoint(String uri, Endpoint endpoint) { + Endpoint oldEndpoint = getEndpoint(uri); + if (oldEndpoint == null) { + throw new IllegalArgumentException("Endpoint is not found: " + uri); + } + map.put(endpoint.getURI(), endpoint); + } + + public void entryAdded(Object key, Object value) { + MapEntry entry = (MapEntry)value; + Endpoint newEp = (Endpoint)entry.getValue(); + if (!isLocal(entry)) { + logger.info(id + " Remote endpoint added: " + entry.getValue()); + } + endpointAdded(newEp); + } + + public void entryRemoved(Object key, Object value) { + MapEntry entry = (MapEntry)value; + if (!isLocal(entry)) { + logger.info(id + " Remote endpoint removed: " + entry.getValue()); + } + endpointRemoved((Endpoint)entry.getValue()); + } + + public void entryUpdated(Object key, Object oldValue, Object newValue) { + MapEntry oldEntry = (MapEntry)oldValue; + MapEntry newEntry = (MapEntry)newValue; + if (!isLocal(newEntry)) { + logger.info(id + " Remote endpoint updated: " + newEntry.getValue()); + } + Endpoint oldEp = (Endpoint)oldEntry.getValue(); + Endpoint newEp = (Endpoint)newEntry.getValue(); + endpointUpdated(oldEp, newEp); + } + + private static String getBindAddress() { + try { + Enumeration nis = NetworkInterface.getNetworkInterfaces(); + while (nis.hasMoreElements()) { + NetworkInterface ni = nis.nextElement(); + // The following APIs require JDK 1.6 + /* + if (ni.isLoopback() || !ni.isUp() || !ni.supportsMulticast()) { + continue; + } + */ + Enumeration ips = ni.getInetAddresses(); + if (!ips.hasMoreElements()) { + continue; + } + while (ips.hasMoreElements()) { + InetAddress addr = ips.nextElement(); + if (addr.isLoopbackAddress()) { + continue; + } + return addr.getHostAddress(); + } + } + return InetAddress.getLocalHost().getHostAddress(); + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + return null; + } + } + + @Override + public List getInstalledContributionURIs() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void uninstallContribution(String uri) { + // TODO Auto-generated method stub + + } + + @Override + public void installContribution(ContributionDescription cd) { + // TODO Auto-generated method stub + + } + + @Override + public ContributionDescription getInstalledContribution(String uri) { + // TODO Auto-generated method stub + return null; + } + + @Override + public void addRunningComposite(String contributionURI, Composite composite) { + // TODO Auto-generated method stub + + } + + @Override + public void removeRunningComposite(String contributionURI, String compositeURI) { + // TODO Auto-generated method stub + + } + + @Override + public Composite getRunningComposite(String contributionURI, String compositeURI) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Map> getRunningCompositeURIs() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void updateInstalledContribution(ContributionDescription cd) { + // TODO Auto-generated method stub + + } + + @Override + public List getNodeNames() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getLocalNodeName() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getRunningNodeName(String contributionURI, String compositeURI) { + // TODO Auto-generated method stub + return null; + } + + @Override + public String remoteCommand(String memberName, Callable command) { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getContainingCompositesContributionURI(String componentName) { + // TODO Auto-generated method stub + return null; + } + +} -- cgit v1.2.3