summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/contrib
diff options
context:
space:
mode:
authorantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2010-05-27 10:35:03 +0000
committerantelder <antelder@13f79535-47bb-0310-9956-ffa450edef68>2010-05-27 10:35:03 +0000
commitd84d07f27f71edf221b804027fc5a20834aa92cb (patch)
treeab68264e5aaec4479a29683a1f378baba4f69cc4 /sca-java-2.x/contrib
parentc909f80892d8230c64a4bb48fe1930cae9ad34e9 (diff)
Move module to contrib as its not in the build or being actively worked on
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@948769 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x/contrib')
-rw-r--r--sca-java-2.x/contrib/modules/endpoint-zookeeper/META-INF/MANIFEST.MF17
-rw-r--r--sca-java-2.x/contrib/modules/endpoint-zookeeper/pom.xml76
-rw-r--r--sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java297
-rw-r--r--sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java312
-rw-r--r--sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/LocalZooKeeperServer.java148
-rw-r--r--sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperDomainRegistryFactory.java49
-rw-r--r--sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java200
-rw-r--r--sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory17
-rw-r--r--sca-java-2.x/contrib/modules/endpoint-zookeeper/src/test/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedMapTestCase.java111
-rw-r--r--sca-java-2.x/contrib/modules/endpoint-zookeeper/src/test/java/org/apache/tuscany/sca/endpoint/zookeeper/LocalZooKeeperServerTestCase.java85
10 files changed, 1312 insertions, 0 deletions
diff --git a/sca-java-2.x/contrib/modules/endpoint-zookeeper/META-INF/MANIFEST.MF b/sca-java-2.x/contrib/modules/endpoint-zookeeper/META-INF/MANIFEST.MF
new file mode 100644
index 0000000000..d9744287d4
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/endpoint-zookeeper/META-INF/MANIFEST.MF
@@ -0,0 +1,17 @@
+Manifest-Version: 1.0
+SCA-Version: 1.1
+Bundle-Name: Apache Tuscany SCA Tomcat Tribes Based EndPoint Registry
+Bundle-Vendor: The Apache Software Foundation
+Bundle-Version: 2.0.0
+Bundle-ManifestVersion: 2
+Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt
+Bundle-Description: Apache Tuscany SCA XSD Model
+Bundle-SymbolicName: org.apache.tuscany.sca.endpoint.tribes
+Bundle-DocURL: http://www.apache.org/
+Bundle-RequiredExecutionEnvironment: J2SE-1.5,JavaSE-1.6
+Import-Package: org.apache.tuscany.sca.assembly;version="2.0.0",
+ org.apache.tuscany.sca.core.assembly.impl;version="2.0.0";scope=internal;resolution:=optional,
+ org.apache.tuscany.sca.core;version="2.0.0",
+ org.apache.tuscany.sca.management;version="2.0.0",
+ org.apache.tuscany.sca.policy;version="2.0.0",
+ org.apache.tuscany.sca.runtime;version="2.0.0"
diff --git a/sca-java-2.x/contrib/modules/endpoint-zookeeper/pom.xml b/sca-java-2.x/contrib/modules/endpoint-zookeeper/pom.xml
new file mode 100644
index 0000000000..cd404ab331
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/endpoint-zookeeper/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<project>
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tuscany.sca</groupId>
+ <artifactId>tuscany-modules</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>tuscany-endpoint-zookeeper</artifactId>
+ <name>Apache Tuscany SCA Hadoop ZooKeeper Based EndPoint Registry</name>
+
+ <!-- http://issues.apache.org/jira/browse/ZOOKEEPER-224 -->
+ <repositories>
+ <repository>
+ <id>zookeeper.repo</id>
+ <url>http://people.apache.org/~chirino/zk-repo/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.2.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.15</version>
+ <scope>runtime</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tuscany.sca</groupId>
+ <artifactId>tuscany-core-spi</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java
new file mode 100644
index 0000000000..d5b8e74631
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.oasisopen.sca.ServiceRuntimeException;
+
+/**
+ *
+ */
+public class AbstractDistributedMap<V> extends AbstractMap<String, V> implements Map<String, V>, Watcher {
+ protected ZooKeeper zooKeeper;
+ protected ClassLoader classLoader;
+ protected String root;
+
+ /**
+ * @param zooKeeper
+ * @param root
+ * @param classLoader
+ */
+ public AbstractDistributedMap(ZooKeeper zooKeeper, String root, ClassLoader classLoader) {
+ super();
+ this.zooKeeper = zooKeeper;
+ this.root = root;
+ this.classLoader = classLoader;
+ }
+
+ public void start() {
+ // FIXME:
+ this.zooKeeper.register(this);
+ try {
+ String path = getPath(root);
+ Stat stat = zooKeeper.exists(path, false);
+ if (stat == null) {
+ zooKeeper.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public Set<Map.Entry<String, V>> entrySet() {
+ String path = getPath(root);
+ List<String> children = Collections.emptyList();
+ try {
+ try {
+ children = zooKeeper.getChildren(path, false);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return Collections.emptySet();
+ } else {
+ throw e;
+ }
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+ return new EntrySet(children);
+ }
+
+ protected String getName(Object key) {
+ String name = String.valueOf(key);
+ name = name.replace("$", "$$");
+ return name.replace('/', '$');
+ }
+
+ public String getPath(String... uris) {
+ StringBuffer buffer = new StringBuffer();
+ for (String uri : uris) {
+ buffer.append('/').append(getName(uri));
+ }
+ return buffer.toString();
+ }
+
+ protected byte[] serialize(V value) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(value);
+ oos.close();
+ return bos.toByteArray();
+ }
+
+ protected V deserialize(byte[] bytes, final ClassLoader classLoader) throws IOException, ClassNotFoundException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis) {
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+ ClassLoader loader = classLoader;
+ if (loader == null) {
+ loader = Thread.currentThread().getContextClassLoader();
+ }
+ try {
+ return Class.forName(desc.getName(), false, loader);
+ } catch (ClassNotFoundException e) {
+ return super.resolveClass(desc);
+ }
+ }
+ };
+ return (V)ois.readObject();
+ }
+
+ @Override
+ public V get(Object key) {
+ String path = getPath(root, getName(key));
+ return getData(path);
+ }
+
+ protected V getData(String path) {
+ try {
+ Stat stat = new Stat();
+ byte[] data = zooKeeper.getData(path, false, stat);
+ return deserialize(data, classLoader);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return null;
+ } else {
+ throw new ServiceRuntimeException(e);
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public V put(String key, V value) {
+ try {
+ String path = getPath(root, getName(key));
+ Stat stat = new Stat();
+ byte[] data = serialize(value);
+
+ try {
+ byte[] oldData = zooKeeper.getData(path, false, stat);
+ zooKeeper.setData(path, data, -1);
+ return deserialize(oldData, classLoader);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ zooKeeper.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public V remove(Object key) {
+ try {
+ String path = getPath(root, getName(key));
+ try {
+ Stat stat = new Stat();
+ byte[] oldData = zooKeeper.getData(path, false, stat);
+ zooKeeper.delete(path, -1);
+ return deserialize(oldData, classLoader);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+
+ }
+
+ private class MapEntry implements Map.Entry<String, V> {
+ private String key;
+
+ /**
+ * @param key
+ */
+ public MapEntry(String key) {
+ super();
+ this.key = key;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ try {
+ try {
+ byte[] data = zooKeeper.getData(getPath(root, getName(key)), false, new Stat());
+ return deserialize(data, classLoader);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+
+ public V setValue(V value) {
+ return put(key, value);
+ }
+ }
+
+ private class EntrySet extends AbstractSet<Map.Entry<String, V>> {
+ private final int size;
+ private final Iterator<String> childrenIterator;
+
+ /**
+ * @param size
+ * @param childrenIterator
+ */
+ public EntrySet(Collection<String> children) {
+ super();
+ this.size = children.size();
+ this.childrenIterator = children.iterator();
+ }
+
+ @Override
+ public Iterator<Map.Entry<String, V>> iterator() {
+ return new Iterator<Map.Entry<String, V>>() {
+ private String path;
+
+ public boolean hasNext() {
+ return childrenIterator.hasNext();
+ }
+
+ public Map.Entry<String, V> next() {
+ path = childrenIterator.next();
+ return new MapEntry(path);
+ }
+
+ public void remove() {
+ childrenIterator.remove();
+ try {
+ zooKeeper.delete(getPath(root, path), -1);
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+ };
+
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+ }
+
+ public void process(WatchedEvent event) {
+ System.out.println(event);
+ }
+}
diff --git a/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java
new file mode 100644
index 0000000000..9923b1e0f7
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.zookeeper;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.logging.Logger;
+
+import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.assembly.EndpointReference;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.LifeCycleListener;
+import org.apache.tuscany.sca.runtime.EndpointListener;
+import org.apache.tuscany.sca.runtime.EndpointRegistry;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooKeeper;
+import org.oasisopen.sca.ServiceRuntimeException;
+
+/**
+ *
+ */
+public class DistributedRegistry extends AbstractDistributedMap<Endpoint> implements EndpointRegistry,
+ LifeCycleListener {
+
+ private static final Logger logger = Logger.getLogger(DistributedRegistry.class.getName());
+ private List<EndpointListener> listeners = new CopyOnWriteArrayList<EndpointListener>();
+ private List<EndpointReference> endpointreferences = new CopyOnWriteArrayList<EndpointReference>();
+
+ private ExtensionPointRegistry registry;
+ private String domainURI;
+ private String registryURI;
+ private String hosts = null;
+ private int sessionTimeout = 100;
+
+ /**
+ *
+ */
+ public DistributedRegistry(ExtensionPointRegistry registry,
+ Map<String, String> attributes,
+ String domainRegistryURI,
+ String domainURI) {
+ super(null, null, null);
+ this.domainURI = domainURI;
+ this.registryURI = domainRegistryURI;
+ Map<String, String> config = parseURI(attributes, registryURI);
+ hosts = config.get("hosts");
+ String timeout = config.get("sessionTimeout");
+ if (timeout != null) {
+ sessionTimeout = Integer.parseInt(timeout.trim());
+ }
+ }
+
+ public void start() {
+ try {
+ zooKeeper = new ZooKeeper(registryURI, sessionTimeout, null);
+ } catch (IOException e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+
+ public void stop() {
+ if (zooKeeper != null) {
+ try {
+ zooKeeper.close();
+ } catch (InterruptedException e) {
+ throw new ServiceRuntimeException(e);
+ }
+ zooKeeper = null;
+ }
+ }
+
+ private Map<String, String> parseURI(Map<String, String> attributes, String domainRegistryURI) {
+ Map<String, String> map = new HashMap<String, String>();
+ if (attributes != null) {
+ map.putAll(attributes);
+ }
+ // Should be zookeeper:host1:port1,host2:port2?sessionTimeout=100
+ int index = domainRegistryURI.indexOf(':');
+ String path = domainRegistryURI.substring(index + 1);
+
+ index = path.indexOf('?');
+ if (index == -1) {
+ map.put("hosts", path);
+ return map;
+ }
+ map.put("hosts", path.substring(0, index));
+ String query = path.substring(index + 1);
+ try {
+ query = URLDecoder.decode(query, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalArgumentException(e);
+ }
+ String[] params = query.split("&");
+ for (String param : params) {
+ index = param.indexOf('=');
+ if (index != -1) {
+ map.put(param.substring(0, index), param.substring(index + 1));
+ }
+ }
+ return map;
+ }
+
+ public void addEndpoint(Endpoint endpoint) {
+ put(endpoint.getURI(), endpoint);
+ logger.info("Add endpoint - " + endpoint);
+ }
+
+ public void addEndpointReference(EndpointReference endpointReference) {
+ endpointreferences.add(endpointReference);
+ logger.fine("Add endpoint reference - " + endpointReference);
+ }
+
+ public void addListener(EndpointListener listener) {
+ listeners.add(listener);
+ }
+
+ /**
+ * Parse the component/service/binding URI into an array of parts (componentURI, serviceName, bindingName)
+ * @param uri
+ * @return
+ */
+ private String[] parse(String uri) {
+ String[] names = new String[3];
+ int index = uri.lastIndexOf('#');
+ if (index == -1) {
+ names[0] = uri;
+ } else {
+ names[0] = uri.substring(0, index);
+ String str = uri.substring(index + 1);
+ if (str.startsWith("service-binding(") && str.endsWith(")")) {
+ str = str.substring("service-binding(".length(), str.length() - 1);
+ String[] parts = str.split("/");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Invalid service-binding URI: " + uri);
+ }
+ names[1] = parts[0];
+ names[2] = parts[1];
+ } else if (str.startsWith("service(") && str.endsWith(")")) {
+ str = str.substring("service(".length(), str.length() - 1);
+ names[1] = str;
+ } else {
+ throw new IllegalArgumentException("Invalid component/service/binding URI: " + uri);
+ }
+ }
+ return names;
+ }
+
+ private boolean matches(String target, String uri) {
+ String[] parts1 = parse(target);
+ String[] parts2 = parse(uri);
+ for (int i = 0; i < parts1.length; i++) {
+ if (parts1[i] == null || parts1[i].equals(parts2[i])) {
+ continue;
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public List<Endpoint> findEndpoint(EndpointReference endpointReference) {
+ List<Endpoint> foundEndpoints = new ArrayList<Endpoint>();
+
+ logger.fine("Find endpoint for reference - " + endpointReference);
+
+ if (endpointReference.getReference() != null) {
+ Endpoint targetEndpoint = endpointReference.getTargetEndpoint();
+ String uri = targetEndpoint.getURI();
+ lookup(foundEndpoints, uri);
+ }
+ return foundEndpoints;
+ }
+
+ private void lookup(List<Endpoint> foundEndpoints, String uri) {
+ for (Object v : values()) {
+ Endpoint endpoint = (Endpoint)v;
+ // TODO: implement more complete matching
+ logger.fine("Matching against - " + endpoint);
+ if (matches(uri, endpoint.getURI())) {
+ // if (!entry.isPrimary()) {
+ ((RuntimeEndpoint)endpoint).bind(registry, this);
+ // }
+ foundEndpoints.add(endpoint);
+ logger.fine("Found endpoint with matching service - " + endpoint);
+ }
+ // else the service name doesn't match
+ }
+ }
+
+ public List<EndpointReference> findEndpointReference(Endpoint endpoint) {
+ return endpointreferences;
+ }
+
+ public Endpoint getEndpoint(String uri) {
+ return get(uri);
+ }
+
+ public List<EndpointReference> getEndpointRefereneces() {
+ return endpointreferences;
+ }
+
+ public Collection<Endpoint> getEndpoints() {
+ return new ArrayList<Endpoint>(values());
+ }
+
+ public List<EndpointListener> getListeners() {
+ return listeners;
+ }
+
+ public void removeEndpoint(Endpoint endpoint) {
+ remove(endpoint.getURI());
+ logger.info("Remove endpoint - " + endpoint);
+ }
+
+ public void removeEndpointReference(EndpointReference endpointReference) {
+ endpointreferences.remove(endpointReference);
+ logger.fine("Remove endpoint reference - " + endpointReference);
+ }
+
+ public void removeListener(EndpointListener listener) {
+ listeners.remove(listener);
+ }
+
+ public void updateEndpoint(String uri, Endpoint endpoint) {
+ Endpoint oldEndpoint = getEndpoint(uri);
+ if (oldEndpoint == null) {
+ throw new IllegalArgumentException("Endpoint is not found: " + uri);
+ }
+ put(endpoint.getURI(), endpoint);
+ }
+
+ public void entryAdded(Endpoint value) {
+ ((RuntimeEndpoint)value).bind(registry, this);
+ for (EndpointListener listener : listeners) {
+ listener.endpointAdded(value);
+ }
+ }
+
+ public void entryRemoved(Endpoint value) {
+ for (EndpointListener listener : listeners) {
+ listener.endpointRemoved(value);
+ }
+ }
+
+ public void entryUpdated(Endpoint oldEp, Endpoint newEp) {
+ ((RuntimeEndpoint)newEp).bind(registry, this);
+ for (EndpointListener listener : listeners) {
+ listener.endpointUpdated(oldEp, newEp);
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ super.process(event);
+ String path = event.getPath();
+ if (path == null || !path.startsWith(getPath(root))) {
+ return;
+ }
+ switch (event.getType()) {
+ case NodeChildrenChanged:
+ break;
+ case NodeCreated:
+ case NodeDataChanged:
+ Endpoint ep = getData(path);
+ entryAdded(ep);
+ break;
+ case NodeDeleted:
+ entryRemoved(null);
+ break;
+ }
+ }
+
+ public List<EndpointReference> getEndpointReferences() {
+ return endpointreferences;
+ }
+
+ public List<Endpoint> findEndpoint(String uri) {
+ List<Endpoint> endpoints = new ArrayList<Endpoint>();
+ lookup(endpoints, uri);
+ return endpoints;
+ }
+
+ public String getDomainURI() {
+ return domainURI;
+ }
+
+}
diff --git a/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/LocalZooKeeperServer.java b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/LocalZooKeeperServer.java
new file mode 100644
index 0000000000..49bb46f983
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/LocalZooKeeperServer.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.zookeeper;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+
+/**
+ * This class starts and runs a standalone ZooKeeperServer.
+ */
+public class LocalZooKeeperServer {
+ private static final Logger logger = Logger.getLogger(LocalZooKeeperServer.class.getName());
+ private static final String USAGE = "Usage: LocalZooKeeperServer configfile | port datadir [ticktime] [maxcnxns]";
+ private NIOServerCnxn.Factory cnxnFactory;
+ private ZooKeeperServer zkServer;
+
+ /**
+ *
+ */
+ public LocalZooKeeperServer() {
+ super();
+ }
+
+ public Thread folk(final ServerConfig config) {
+ Thread thread = new Thread() {
+ public void run() {
+ try {
+ LocalZooKeeperServer.this.run(config);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
+ thread.start();
+ return thread;
+ }
+
+ public Thread folk(final String[] args) {
+ Thread thread = new Thread() {
+ public void run() {
+ try {
+ LocalZooKeeperServer.this.run(args);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
+ thread.start();
+ return thread;
+ }
+
+ /**
+ * Run from a ServerConfig.
+ * @param config ServerConfig to use.
+ * @throws IOException
+ */
+ public void run(ServerConfig config) throws IOException {
+ logger.info("Starting ZooKeeper server");
+ try {
+ // Note that this thread isn't going to be doing anything else,
+ // so rather than spawning another thread, we will just call
+ // run() in this thread.
+ // create a file logger url from the command line args
+ zkServer = new ZooKeeperServer();
+
+ FileTxnSnapLog ftxn = new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir()));
+ zkServer.setTxnLogFactory(ftxn);
+ zkServer.setTickTime(config.getTickTime());
+ cnxnFactory = new NIOServerCnxn.Factory(config.getClientPort(), config.getMaxClientCnxns());
+ cnxnFactory.startup(zkServer);
+ cnxnFactory.join();
+ if (zkServer.isRunning()) {
+ zkServer.shutdown();
+ }
+ } catch (InterruptedException e) {
+ // warn, but generally this is ok
+ logger.log(Level.WARNING, "Server interrupted", e);
+ }
+ }
+
+ public void run(String[] args) throws ConfigException, IOException {
+ ServerConfig config = new ServerConfig();
+ if (args.length == 1) {
+ config.parse(args[0]);
+ } else {
+ config.parse(args);
+ }
+ run(config);
+ }
+
+ /**
+ * Shutdown the serving instance
+ */
+ public void shutdown() {
+ cnxnFactory.shutdown();
+ }
+
+ /*
+ * Start up the ZooKeeper server.
+ *
+ * @param args the configfile or the port datadir [ticktime]
+ */
+ public static void main(String[] args) {
+ LocalZooKeeperServer main = new LocalZooKeeperServer();
+ try {
+ main.run(args);
+ } catch (IllegalArgumentException e) {
+ logger.log(Level.SEVERE, "Invalid arguments, exiting abnormally", e);
+ logger.info(USAGE);
+ System.err.println(USAGE);
+ System.exit(2);
+ } catch (ConfigException e) {
+ logger.log(Level.SEVERE, "Invalid config, exiting abnormally", e);
+ System.err.println("Invalid config, exiting abnormally");
+ System.exit(2);
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, "Unexpected exception, exiting abnormally", e);
+ System.exit(1);
+ }
+ logger.info("Exiting normally");
+ System.exit(0);
+ }
+}
diff --git a/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperDomainRegistryFactory.java b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperDomainRegistryFactory.java
new file mode 100644
index 0000000000..2f9dcfc27f
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperDomainRegistryFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.zookeeper;
+
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.runtime.BaseDomainRegistryFactory;
+import org.apache.tuscany.sca.runtime.EndpointRegistry;
+
+/**
+ * The utility responsible for finding the endpoint regstry by the scheme and creating instances for the
+ * given domain
+ */
+public class ZooKeeperDomainRegistryFactory extends BaseDomainRegistryFactory {
+ private final static String[] schemes = new String[] {"zookeeper"};
+
+ /**
+ * @param extensionRegistry
+ */
+ public ZooKeeperDomainRegistryFactory(ExtensionPointRegistry registry) {
+ super(registry);
+ }
+
+ protected EndpointRegistry createEndpointRegistry(String endpointRegistryURI, String domainURI) {
+ EndpointRegistry endpointRegistry =
+ new DistributedRegistry(registry, null, endpointRegistryURI, domainURI);
+ return endpointRegistry;
+ }
+
+ public String[] getSupportedSchemes() {
+ return schemes;
+ }
+}
diff --git a/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java
new file mode 100644
index 0000000000..e92df3d91d
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.zookeeper;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.ZooKeeperServer.DataTreeBuilder;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+/**
+ *
+ */
+public class ZooKeeperHelper implements Watcher {
+ public ZooKeeper connect(String host, int port, int timeout) throws IOException {
+ return new ZooKeeper(host + ":" + port, timeout, this);
+ }
+
+ public String getName(String uri) {
+ String name = uri.replace("$", "$$");
+ return name.replace('/', '$');
+ }
+
+ private byte[] getBytes(String str) {
+ try {
+ return str.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public String join(ZooKeeper zooKeeper, String domainURI, String nodeURI) throws KeeperException,
+ InterruptedException {
+ String domain = "/" + getName(domainURI);
+ Stat stat = zooKeeper.exists(domain, false);
+ String path = domain;
+ if (stat == null) {
+ path = zooKeeper.create(domain, getBytes(domainURI), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ path = path + "/" + getName(nodeURI);
+ stat = zooKeeper.exists(path, false);
+ if (stat != null) {
+ return zooKeeper.create(path, getBytes(nodeURI), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ } else {
+ return path;
+ }
+ }
+
+ public String getPath(String... uris) {
+ StringBuffer buffer = new StringBuffer();
+ for (String uri : uris) {
+ buffer.append('/').append(getName(uri));
+ }
+ return buffer.toString();
+ }
+
+ public void put(ZooKeeper zooKeeper, String domainURI, String nodeURI, String endpointURI, byte[] endpoint)
+ throws KeeperException, InterruptedException {
+ String path = join(zooKeeper, domainURI, nodeURI);
+ path = path + "/" + getName(endpointURI);
+ Stat stat = zooKeeper.exists(path, false);
+ if (stat == null) {
+ zooKeeper.create(path, endpoint, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ } else {
+ zooKeeper.setData(path, endpoint, -1);
+ }
+ }
+
+ public void remove(ZooKeeper zooKeeper, String domainURI, String nodeURI, String endpointURI)
+ throws KeeperException, InterruptedException {
+ String path = getPath(domainURI, nodeURI, endpointURI);
+ zooKeeper.delete(path, -1);
+ }
+
+ public void remove(ZooKeeper zooKeeper, String domainURI, String nodeURI) throws InterruptedException,
+ KeeperException {
+ String node = getPath(domainURI, nodeURI);
+ zooKeeper.delete(node, -1);
+ }
+
+ public byte[] get(ZooKeeper zooKeeper, String domainURI, String nodeURI, String endpointURI)
+ throws KeeperException, InterruptedException {
+ String path = getPath(domainURI, nodeURI, endpointURI);
+ return zooKeeper.getData(path, false, new Stat());
+ }
+
+ public List<byte[]> get(ZooKeeper zooKeeper, String domainURI, String nodeURI) throws KeeperException,
+ InterruptedException {
+ String node = getPath(domainURI, nodeURI);
+ List<String> endpoints = zooKeeper.getChildren(node, false);
+ List<byte[]> data = new ArrayList<byte[]>();
+ for (String endpoint : endpoints) {
+ Stat stat = new Stat();
+ data.add(zooKeeper.getData(endpoint, false, stat));
+ }
+ return data;
+ }
+
+ public List<byte[]> get(ZooKeeper zooKeeper, String domainURI) throws KeeperException, InterruptedException {
+ String path = getPath(domainURI);
+ List<String> nodes = zooKeeper.getChildren(path, false);
+ List<byte[]> data = new ArrayList<byte[]>();
+ for (String node : nodes) {
+ List<String> endpoints = zooKeeper.getChildren(node, false);
+ for (String endpoint : endpoints) {
+ Stat stat = new Stat();
+ data.add(zooKeeper.getData(endpoint, false, stat));
+ }
+ }
+ return data;
+ }
+
+ public ZooKeeper create(String connectString, int timeout, long sessionId, byte[] password) throws IOException {
+ return new ZooKeeper(connectString, timeout, this, sessionId, password);
+ }
+
+ public void process(WatchedEvent event) {
+ System.out.println(event);
+ }
+
+ public ZooKeeperServer createServer(File dataDir, File snapDir, int tickTime) throws IOException {
+ FileTxnSnapLog log = new FileTxnSnapLog(dataDir, snapDir);
+ DataTreeBuilder builder = new ZooKeeperServer.BasicDataTreeBuilder();
+ return new ZooKeeperServer(log, tickTime, builder);
+ }
+
+ public void put(ZooKeeper zooKeeper, String key, Object value) throws KeeperException, InterruptedException {
+ byte[] data = serialize(value);
+ List<ACL> acls = Collections.emptyList();
+ zooKeeper.create(key, data, acls, CreateMode.PERSISTENT);
+ }
+
+ public byte[] serialize(Object value) {
+ return null;
+ }
+
+
+
+ public static void main(final String[] args) throws Exception {
+ final String options[] =
+ args.length != 0 ? args
+ : new String[] {"9999", System.getProperty("java.io.tmpdir") + File.separator + "zookeeper"};
+ Thread thread = new Thread() {
+ public void run() {
+ ZooKeeperServerMain.main(options);
+ }
+ };
+ thread.start();
+ Thread.sleep(1000);
+ ZooKeeper zooKeeper = new ZooKeeperHelper().connect("localhost", 9999, 500);
+ System.out.println(zooKeeper.getSessionId());
+ try {
+ String data = new String(zooKeeper.getData("/x", false, null));
+ System.out.println(data);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ zooKeeper.create("/x", "X".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ }
+ zooKeeper.create("/x/y", "XY".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ Thread.sleep(1000);
+ String data = new String(zooKeeper.getData("/x/y", true, null));
+ System.out.println(data);
+ zooKeeper.close();
+ Thread.sleep(500);
+ System.exit(0);
+ }
+}
diff --git a/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory
new file mode 100644
index 0000000000..42458b8910
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+org.apache.tuscany.sca.endpoint.zookeeper.ZooKeeperDomainRegistryFactory;ranking=20 \ No newline at end of file
diff --git a/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/test/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedMapTestCase.java b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/test/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedMapTestCase.java
new file mode 100644
index 0000000000..325f060645
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/test/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedMapTestCase.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.zookeeper;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.server.PurgeTxnLog;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class DistributedMapTestCase implements Watcher {
+
+ private static LocalZooKeeperServer server;
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ server = new LocalZooKeeperServer();
+ }
+
+ @Test
+ public void testMap() throws Exception {
+ String[] args = new String[] {"8085", "target/zookeeper"};
+ server.folk(args);
+ ZooKeeper client = new ZooKeeper("localhost:8085", 500, this);
+ synchronized (this) {
+ wait(10000);
+ }
+ try {
+ AbstractDistributedMap<String> map =
+ new AbstractDistributedMap<String>(client, "testMap", getClass().getClassLoader());
+ map.start();
+ map.clear();
+ String value = map.put("1", "A");
+ Assert.assertNull(value);
+ value = map.put("2", "B");
+ Assert.assertNull(value);
+ value = map.put("2", "C");
+ Assert.assertEquals("B", value);
+ value = map.get("2");
+ Assert.assertEquals("C", value);
+ value = map.remove("2");
+ Assert.assertEquals("C", value);
+ map.put("3", "D");
+ Map<String, String> map1 = new HashMap<String, String>();
+ map1.put("4", "E");
+ map1.put("5", "F");
+ map.putAll(map1);
+ System.out.println(map);
+ Assert.assertTrue(map.containsKey("5"));
+ Assert.assertTrue(map.containsValue("F"));
+ map.clear();
+ Assert.assertEquals(0, map.size());
+ } finally {
+ client.close();
+ }
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (server != null) {
+ server.shutdown();
+ File dir = new File("target/zookeeper");
+ PurgeTxnLog.purge(dir, dir, 3);
+ }
+ }
+
+ public void process(WatchedEvent event) {
+ System.out.println(event);
+ if (event.getPath() == null && event.getState() == KeeperState.SyncConnected) {
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+ }
+
+}
diff --git a/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/test/java/org/apache/tuscany/sca/endpoint/zookeeper/LocalZooKeeperServerTestCase.java b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/test/java/org/apache/tuscany/sca/endpoint/zookeeper/LocalZooKeeperServerTestCase.java
new file mode 100644
index 0000000000..51a4d7455e
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/endpoint-zookeeper/src/test/java/org/apache/tuscany/sca/endpoint/zookeeper/LocalZooKeeperServerTestCase.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.zookeeper;
+
+import java.io.File;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.PurgeTxnLog;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class LocalZooKeeperServerTestCase implements Watcher {
+ private static LocalZooKeeperServer server;
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ server = new LocalZooKeeperServer();
+ }
+
+ @Test
+ public void testServer() throws Exception {
+ String[] args = new String[] {"8085", "target/zookeeper"};
+ Thread thread = server.folk(args);
+ ZooKeeper client = new ZooKeeper("localhost:8085", 500, this);
+ synchronized (this) {
+ wait(10000);
+ }
+ client.create("/test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ byte[] data = client.getData("/test", false, null);
+ Assert.assertEquals("123", new String(data));
+ client.close();
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (server != null) {
+ server.shutdown();
+ File dir = new File("target/zookeeper");
+ PurgeTxnLog.purge(dir, dir, 3);
+ }
+ }
+
+ public void process(WatchedEvent event) {
+ System.out.println(event);
+ if (event.getPath() == null && event.getState() == KeeperState.SyncConnected) {
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+ }
+
+}