From 9d6f2d2fd74f310f2c9a87a69f44ac7091f450d3 Mon Sep 17 00:00:00 2001 From: rfeng Date: Wed, 11 Nov 2009 18:04:06 +0000 Subject: Start to implement the EndpointRegistry interface git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@834990 13f79535-47bb-0310-9956-ffa450edef68 --- .../endpoint/zookeeper/AbstractDistributedMap.java | 14 +- .../endpoint/zookeeper/DistributedRegistry.java | 244 +++++++++++++++------ .../sca/endpoint/zookeeper/ZooKeeperHelper.java | 36 +-- 3 files changed, 184 insertions(+), 110 deletions(-) (limited to 'java') diff --git a/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java b/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java index 68133db7c0..d5b8e74631 100644 --- a/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java +++ b/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.ObjectStreamClass; -import java.io.UnsupportedEncodingException; import java.util.AbstractMap; import java.util.AbstractSet; import java.util.Collection; @@ -113,14 +112,6 @@ public class AbstractDistributedMap extends AbstractMap implements return buffer.toString(); } - private byte[] getBytes(String str) { - try { - return str.getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new IllegalArgumentException(e); - } - } - protected byte[] serialize(V value) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); @@ -150,6 +141,10 @@ public class AbstractDistributedMap extends AbstractMap implements @Override public V get(Object key) { String path = getPath(root, getName(key)); + return getData(path); + } + + protected V getData(String path) { try { Stat stat = new Stat(); byte[] data = zooKeeper.getData(path, false, stat); @@ -163,6 +158,7 @@ public class AbstractDistributedMap extends AbstractMap implements } catch (Throwable e) { throw new ServiceRuntimeException(e); } + } @Override diff --git a/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java b/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java index 1e311da0ba..22d1d4d00a 100644 --- a/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java +++ b/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java @@ -20,30 +20,50 @@ package org.apache.tuscany.sca.endpoint.zookeeper; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.logging.Logger; import org.apache.tuscany.sca.assembly.Endpoint; import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.LifeCycleListener; import org.apache.tuscany.sca.runtime.EndpointListener; import org.apache.tuscany.sca.runtime.EndpointRegistry; +import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooKeeper; import org.oasisopen.sca.ServiceRuntimeException; /** * */ -public class DistributedRegistry implements EndpointRegistry, LifeCycleListener { +public class DistributedRegistry extends AbstractDistributedMap implements EndpointRegistry, + LifeCycleListener { + + private static final Logger logger = Logger.getLogger(DistributedRegistry.class.getName()); + private List listeners = new CopyOnWriteArrayList(); + private List endpointreferences = new CopyOnWriteArrayList(); + + private ExtensionPointRegistry registry; private String domainURI; private String registryURI; - private ZooKeeper zooKeeper; /** * */ - public DistributedRegistry(String domainURI, String registryURI) { + public DistributedRegistry(ExtensionPointRegistry registry, + Map attributes, + String domainRegistryURI, + String domainURI) { + super(null, null, null); this.domainURI = domainURI; - this.registryURI = registryURI; + this.registryURI = domainRegistryURI; + Map config = parseURI(attributes, domainRegistryURI); } public void start() { @@ -65,108 +85,200 @@ public class DistributedRegistry implements EndpointRegistry, LifeCycleListener } } - /* (non-Javadoc) - * @see org.apache.tuscany.sca.runtime.EndpointRegistry#addEndpoint(org.apache.tuscany.sca.assembly.Endpoint) - */ - public void addEndpoint(Endpoint endpoint) { - // TODO Auto-generated method stub + private Map parseURI(Map attributes, String domainRegistryURI) { + Map map = new HashMap(); + if (attributes != null) { + map.putAll(attributes); + } + // Should be zookeeper:host1:port1,host2:port2?sessionTimeout=100 + int index = domainRegistryURI.indexOf(':'); + String path = domainRegistryURI.substring(index + 1); + + index = path.indexOf('?'); + if (index == -1) { + map.put("hosts", path); + return map; + } + map.put("hosts", path.substring(0, index)); + String query = path.substring(index + 1); + try { + query = URLDecoder.decode(query, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new IllegalArgumentException(e); + } + String[] params = query.split("&"); + for (String param : params) { + index = param.indexOf('='); + if (index != -1) { + map.put(param.substring(0, index), param.substring(index + 1)); + } + } + return map; + } + public void addEndpoint(Endpoint endpoint) { + put(endpoint.getURI(), endpoint); + logger.info("Add endpoint - " + endpoint); } - /* (non-Javadoc) - * @see org.apache.tuscany.sca.runtime.EndpointRegistry#addEndpointReference(org.apache.tuscany.sca.assembly.EndpointReference) - */ public void addEndpointReference(EndpointReference endpointReference) { - // TODO Auto-generated method stub + endpointreferences.add(endpointReference); + logger.fine("Add endpoint reference - " + endpointReference); + } + public void addListener(EndpointListener listener) { + listeners.add(listener); } - /* (non-Javadoc) - * @see org.apache.tuscany.sca.runtime.EndpointRegistry#addListener(org.apache.tuscany.sca.runtime.EndpointListener) + /** + * Parse the component/service/binding URI into an array of parts (componentURI, serviceName, bindingName) + * @param uri + * @return */ - public void addListener(EndpointListener listener) { - // TODO Auto-generated method stub + private String[] parse(String uri) { + String[] names = new String[3]; + int index = uri.lastIndexOf('#'); + if (index == -1) { + names[0] = uri; + } else { + names[0] = uri.substring(0, index); + String str = uri.substring(index + 1); + if (str.startsWith("service-binding(") && str.endsWith(")")) { + str = str.substring("service-binding(".length(), str.length() - 1); + String[] parts = str.split("/"); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid service-binding URI: " + uri); + } + names[1] = parts[0]; + names[2] = parts[1]; + } else if (str.startsWith("service(") && str.endsWith(")")) { + str = str.substring("service(".length(), str.length() - 1); + names[1] = str; + } else { + throw new IllegalArgumentException("Invalid component/service/binding URI: " + uri); + } + } + return names; + } + private boolean matches(String target, String uri) { + String[] parts1 = parse(target); + String[] parts2 = parse(uri); + for (int i = 0; i < parts1.length; i++) { + if (parts1[i] == null || parts1[i].equals(parts2[i])) { + continue; + } else { + return false; + } + } + return true; } - /* (non-Javadoc) - * @see org.apache.tuscany.sca.runtime.EndpointRegistry#findEndpoint(org.apache.tuscany.sca.assembly.EndpointReference) - */ public List findEndpoint(EndpointReference endpointReference) { - // TODO Auto-generated method stub - return null; + List foundEndpoints = new ArrayList(); + + logger.fine("Find endpoint for reference - " + endpointReference); + + if (endpointReference.getReference() != null) { + Endpoint targetEndpoint = endpointReference.getTargetEndpoint(); + for (Object v : values()) { + Endpoint endpoint = (Endpoint)v; + // TODO: implement more complete matching + logger.fine("Matching against - " + endpoint); + if (matches(targetEndpoint.getURI(), endpoint.getURI())) { + // if (!entry.isPrimary()) { + endpoint.setExtensionPointRegistry(registry); + // } + foundEndpoints.add(endpoint); + logger.fine("Found endpoint with matching service - " + endpoint); + } + // else the service name doesn't match + } + } + return foundEndpoints; } - /* (non-Javadoc) - * @see org.apache.tuscany.sca.runtime.EndpointRegistry#findEndpointReference(org.apache.tuscany.sca.assembly.Endpoint) - */ public List findEndpointReference(Endpoint endpoint) { - // TODO Auto-generated method stub - return null; + return endpointreferences; } - /* (non-Javadoc) - * @see org.apache.tuscany.sca.runtime.EndpointRegistry#getEndpoint(java.lang.String) - */ public Endpoint getEndpoint(String uri) { - // TODO Auto-generated method stub - return null; + return get(uri); } - /* (non-Javadoc) - * @see org.apache.tuscany.sca.runtime.EndpointRegistry#getEndpointRefereneces() - */ public List getEndpointRefereneces() { - // TODO Auto-generated method stub - return null; + return endpointreferences; } - /* (non-Javadoc) - * @see org.apache.tuscany.sca.runtime.EndpointRegistry#getEndpoints() - */ public List getEndpoints() { - // TODO Auto-generated method stub - return null; + return new ArrayList(values()); } - /* (non-Javadoc) - * @see org.apache.tuscany.sca.runtime.EndpointRegistry#getListeners() - */ public List getListeners() { - // TODO Auto-generated method stub - return null; + return listeners; } - /* (non-Javadoc) - * @see org.apache.tuscany.sca.runtime.EndpointRegistry#removeEndpoint(org.apache.tuscany.sca.assembly.Endpoint) - */ public void removeEndpoint(Endpoint endpoint) { - // TODO Auto-generated method stub - + remove(endpoint.getURI()); + logger.info("Remove endpoint - " + endpoint); } - /* (non-Javadoc) - * @see org.apache.tuscany.sca.runtime.EndpointRegistry#removeEndpointReference(org.apache.tuscany.sca.assembly.EndpointReference) - */ public void removeEndpointReference(EndpointReference endpointReference) { - // TODO Auto-generated method stub - + endpointreferences.remove(endpointReference); + logger.fine("Remove endpoint reference - " + endpointReference); } - /* (non-Javadoc) - * @see org.apache.tuscany.sca.runtime.EndpointRegistry#removeListener(org.apache.tuscany.sca.runtime.EndpointListener) - */ public void removeListener(EndpointListener listener) { - // TODO Auto-generated method stub - + listeners.remove(listener); } - /* (non-Javadoc) - * @see org.apache.tuscany.sca.runtime.EndpointRegistry#updateEndpoint(java.lang.String, org.apache.tuscany.sca.assembly.Endpoint) - */ public void updateEndpoint(String uri, Endpoint endpoint) { - // TODO Auto-generated method stub + Endpoint oldEndpoint = getEndpoint(uri); + if (oldEndpoint == null) { + throw new IllegalArgumentException("Endpoint is not found: " + uri); + } + put(endpoint.getURI(), endpoint); + } + public void entryAdded(Endpoint value) { + value.setExtensionPointRegistry(registry); + for (EndpointListener listener : listeners) { + listener.endpointAdded(value); + } + } + + public void entryRemoved(Endpoint value) { + for (EndpointListener listener : listeners) { + listener.endpointRemoved(value); + } + } + + public void entryUpdated(Endpoint oldEp, Endpoint newEp) { + newEp.setExtensionPointRegistry(registry); + for (EndpointListener listener : listeners) { + listener.endpointUpdated(oldEp, newEp); + } + } + + @Override + public void process(WatchedEvent event) { + super.process(event); + String path = event.getPath(); + if (path == null || !path.startsWith(getPath(root))) { + return; + } + switch (event.getType()) { + case NodeChildrenChanged: + break; + case NodeCreated: + case NodeDataChanged: + Endpoint ep = getData(path); + entryAdded(ep); + break; + case NodeDeleted: + entryRemoved(null); + break; + } } } diff --git a/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java b/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java index 3f7a5c63be..e92df3d91d 100644 --- a/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java +++ b/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java @@ -22,13 +22,9 @@ package org.apache.tuscany.sca.endpoint.zookeeper; import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.net.URLDecoder; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -170,37 +166,7 @@ public class ZooKeeperHelper implements Watcher { return null; } - private Map parseURI(Map attributes, String domainRegistryURI) { - Map map = new HashMap(); - if (attributes != null) { - map.putAll(attributes); - } - URI uri = URI.create(domainRegistryURI); - if (uri.getHost() != null) { - map.put("host", uri.getHost()); - } - if (uri.getPort() != -1) { - map.put("port", String.valueOf(uri.getPort())); - } - int index = domainRegistryURI.indexOf('?'); - if (index == -1) { - return map; - } - String query = domainRegistryURI.substring(index + 1); - try { - query = URLDecoder.decode(query, "UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new IllegalArgumentException(e); - } - String[] params = query.split("&"); - for (String param : params) { - index = param.indexOf('='); - if (index != -1) { - map.put(param.substring(0, index), param.substring(index + 1)); - } - } - return map; - } + public static void main(final String[] args) throws Exception { final String options[] = -- cgit v1.2.3