summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--java/sca/modules/endpoint-zookeeper/META-INF/MANIFEST.MF17
-rw-r--r--java/sca/modules/endpoint-zookeeper/pom.xml76
-rw-r--r--java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java289
-rw-r--r--java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java172
-rw-r--r--java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java234
-rw-r--r--java/sca/modules/endpoint-zookeeper/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry17
6 files changed, 805 insertions, 0 deletions
diff --git a/java/sca/modules/endpoint-zookeeper/META-INF/MANIFEST.MF b/java/sca/modules/endpoint-zookeeper/META-INF/MANIFEST.MF
new file mode 100644
index 0000000000..a60b2ba656
--- /dev/null
+++ b/java/sca/modules/endpoint-zookeeper/META-INF/MANIFEST.MF
@@ -0,0 +1,17 @@
+Manifest-Version: 1.0
+Bundle-Name: Apache Tuscany SCA Tomcat Tribes Based EndPoint Registry
+Created-By: 1.6.0_07 (Sun Microsystems Inc.)
+Bundle-Vendor: The Apache Software Foundation
+Bundle-Version: 2.0.0
+Bundle-ManifestVersion: 2
+Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt
+Bundle-Description: Apache Tuscany SCA XSD Model
+Bundle-SymbolicName: org.apache.tuscany.sca.endpoint.tribes
+Bundle-DocURL: http://www.apache.org/
+Bundle-RequiredExecutionEnvironment: J2SE-1.5,JavaSE-1.6
+Import-Package: org.apache.tuscany.sca.assembly;version="2.0.0",
+ org.apache.tuscany.sca.core.assembly.impl;version="2.0.0";scope=internal;resolution:=optional,
+ org.apache.tuscany.sca.core;version="2.0.0",
+ org.apache.tuscany.sca.management;version="2.0.0",
+ org.apache.tuscany.sca.policy;version="2.0.0",
+ org.apache.tuscany.sca.runtime;version="2.0.0"
diff --git a/java/sca/modules/endpoint-zookeeper/pom.xml b/java/sca/modules/endpoint-zookeeper/pom.xml
new file mode 100644
index 0000000000..cd404ab331
--- /dev/null
+++ b/java/sca/modules/endpoint-zookeeper/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+<project>
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tuscany.sca</groupId>
+ <artifactId>tuscany-modules</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>tuscany-endpoint-zookeeper</artifactId>
+ <name>Apache Tuscany SCA Hadoop ZooKeeper Based EndPoint Registry</name>
+
+ <!-- http://issues.apache.org/jira/browse/ZOOKEEPER-224 -->
+ <repositories>
+ <repository>
+ <id>zookeeper.repo</id>
+ <url>http://people.apache.org/~chirino/zk-repo/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.2.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.15</version>
+ <scope>runtime</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tuscany.sca</groupId>
+ <artifactId>tuscany-core-spi</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java b/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java
new file mode 100644
index 0000000000..94306507ce
--- /dev/null
+++ b/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.UnsupportedEncodingException;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.oasisopen.sca.ServiceRuntimeException;
+
+/**
+ *
+ */
+public class AbstractDistributedMap<V> extends AbstractMap<String, V> implements Map<String, V>, Watcher {
+ protected ZooKeeper zooKeeper;
+ /**
+ * @param zooKeeper
+ * @param root
+ * @param classLoader
+ */
+ public AbstractDistributedMap(ZooKeeper zooKeeper, String root, ClassLoader classLoader) {
+ super();
+ this.zooKeeper = zooKeeper;
+ this.root = root;
+ this.classLoader = classLoader;
+ // FIXME:
+ this.zooKeeper.register(this);
+ }
+
+ protected ClassLoader classLoader;
+ protected String root;
+
+ @Override
+ public Set<Map.Entry<String, V>> entrySet() {
+ String path = getPath(root);
+ List<String> children = Collections.emptyList();
+ try {
+ try {
+ children = zooKeeper.getChildren(path, false);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return Collections.emptySet();
+ } else {
+ throw e;
+ }
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+ return new EntrySet(children);
+ }
+
+ protected String getName(Object key) {
+ String name = String.valueOf(key);
+ name = name.replace("$", "$$");
+ return name.replace('/', '$');
+ }
+
+ public String getPath(String... uris) {
+ StringBuffer buffer = new StringBuffer();
+ for (String uri : uris) {
+ buffer.append('/').append(getName(uri));
+ }
+ return buffer.toString();
+ }
+
+ private byte[] getBytes(String str) {
+ try {
+ return str.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ protected byte[] serialize(V value) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(value);
+ oos.close();
+ return bos.toByteArray();
+ }
+
+ protected V deserialize(byte[] bytes, final ClassLoader classLoader) throws IOException, ClassNotFoundException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis) {
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+ ClassLoader loader = classLoader;
+ if (loader == null) {
+ loader = Thread.currentThread().getContextClassLoader();
+ }
+ try {
+ return Class.forName(desc.getName(), false, loader);
+ } catch (ClassNotFoundException e) {
+ return super.resolveClass(desc);
+ }
+ }
+ };
+ return (V)ois.readObject();
+ }
+
+ @Override
+ public V get(Object key) {
+ String path = getPath(root, getName(key));
+ try {
+ Stat stat = new Stat();
+ byte[] data = zooKeeper.getData(path, false, stat);
+ return deserialize(data, classLoader);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return null;
+ } else {
+ throw new ServiceRuntimeException(e);
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+
+ @Override
+ public V put(String key, V value) {
+ try {
+ String path = getPath(root, getName(key));
+ Stat stat = new Stat();
+ byte[] data = serialize(value);
+
+ try {
+ byte[] oldData = zooKeeper.getData(path, false, stat);
+ zooKeeper.setData(path, data, -1);
+ return deserialize(oldData, classLoader);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ zooKeeper.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public V remove(Object key) {
+ try {
+ String path = getPath(root, getName(key));
+ try {
+ Stat stat = new Stat();
+ byte[] oldData = zooKeeper.getData(path, false, stat);
+ zooKeeper.delete(path, -1);
+ return deserialize(oldData, classLoader);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+
+ }
+
+ private class MapEntry implements Map.Entry<String, V> {
+ private String key;
+
+ /**
+ * @param key
+ */
+ public MapEntry(String key) {
+ super();
+ this.key = key;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ try {
+ try {
+ byte[] data = zooKeeper.getData(getPath(root, getName(key)), false, new Stat());
+ return deserialize(data, classLoader);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+
+ public V setValue(V value) {
+ return put(key, value);
+ }
+ }
+
+ private class EntrySet extends AbstractSet<Map.Entry<String, V>> {
+ private final int size;
+ private final Iterator<String> childrenIterator;
+
+ /**
+ * @param size
+ * @param childrenIterator
+ */
+ public EntrySet(Collection<String> children) {
+ super();
+ this.size = children.size();
+ this.childrenIterator = children.iterator();
+ }
+
+ @Override
+ public Iterator<Map.Entry<String, V>> iterator() {
+ return new Iterator<Map.Entry<String, V>>() {
+ private String path;
+
+ public boolean hasNext() {
+ return childrenIterator.hasNext();
+ }
+
+ public Map.Entry<String, V> next() {
+ path = childrenIterator.next();
+ return new MapEntry(path);
+ }
+
+ public void remove() {
+ childrenIterator.remove();
+ try {
+ zooKeeper.delete(path, -1);
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+ };
+
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+ }
+
+ public void process(WatchedEvent event) {
+ System.out.println(event);
+ }
+}
diff --git a/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java b/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java
new file mode 100644
index 0000000000..1e311da0ba
--- /dev/null
+++ b/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.zookeeper;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.assembly.EndpointReference;
+import org.apache.tuscany.sca.core.LifeCycleListener;
+import org.apache.tuscany.sca.runtime.EndpointListener;
+import org.apache.tuscany.sca.runtime.EndpointRegistry;
+import org.apache.zookeeper.ZooKeeper;
+import org.oasisopen.sca.ServiceRuntimeException;
+
+/**
+ *
+ */
+public class DistributedRegistry implements EndpointRegistry, LifeCycleListener {
+ private String domainURI;
+ private String registryURI;
+ private ZooKeeper zooKeeper;
+
+ /**
+ *
+ */
+ public DistributedRegistry(String domainURI, String registryURI) {
+ this.domainURI = domainURI;
+ this.registryURI = registryURI;
+ }
+
+ public void start() {
+ try {
+ zooKeeper = new ZooKeeper(registryURI, 100, null);
+ } catch (IOException e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+
+ public void stop() {
+ if (zooKeeper != null) {
+ try {
+ zooKeeper.close();
+ } catch (InterruptedException e) {
+ throw new ServiceRuntimeException(e);
+ }
+ zooKeeper = null;
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.runtime.EndpointRegistry#addEndpoint(org.apache.tuscany.sca.assembly.Endpoint)
+ */
+ public void addEndpoint(Endpoint endpoint) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.runtime.EndpointRegistry#addEndpointReference(org.apache.tuscany.sca.assembly.EndpointReference)
+ */
+ public void addEndpointReference(EndpointReference endpointReference) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.runtime.EndpointRegistry#addListener(org.apache.tuscany.sca.runtime.EndpointListener)
+ */
+ public void addListener(EndpointListener listener) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.runtime.EndpointRegistry#findEndpoint(org.apache.tuscany.sca.assembly.EndpointReference)
+ */
+ public List<Endpoint> findEndpoint(EndpointReference endpointReference) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.runtime.EndpointRegistry#findEndpointReference(org.apache.tuscany.sca.assembly.Endpoint)
+ */
+ public List<EndpointReference> findEndpointReference(Endpoint endpoint) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.runtime.EndpointRegistry#getEndpoint(java.lang.String)
+ */
+ public Endpoint getEndpoint(String uri) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.runtime.EndpointRegistry#getEndpointRefereneces()
+ */
+ public List<EndpointReference> getEndpointRefereneces() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.runtime.EndpointRegistry#getEndpoints()
+ */
+ public List<Endpoint> getEndpoints() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.runtime.EndpointRegistry#getListeners()
+ */
+ public List<EndpointListener> getListeners() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.runtime.EndpointRegistry#removeEndpoint(org.apache.tuscany.sca.assembly.Endpoint)
+ */
+ public void removeEndpoint(Endpoint endpoint) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.runtime.EndpointRegistry#removeEndpointReference(org.apache.tuscany.sca.assembly.EndpointReference)
+ */
+ public void removeEndpointReference(EndpointReference endpointReference) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.runtime.EndpointRegistry#removeListener(org.apache.tuscany.sca.runtime.EndpointListener)
+ */
+ public void removeListener(EndpointListener listener) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tuscany.sca.runtime.EndpointRegistry#updateEndpoint(java.lang.String, org.apache.tuscany.sca.assembly.Endpoint)
+ */
+ public void updateEndpoint(String uri, Endpoint endpoint) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java b/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java
new file mode 100644
index 0000000000..3f7a5c63be
--- /dev/null
+++ b/java/sca/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.zookeeper;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.ZooKeeperServer.DataTreeBuilder;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+/**
+ *
+ */
+public class ZooKeeperHelper implements Watcher {
+ public ZooKeeper connect(String host, int port, int timeout) throws IOException {
+ return new ZooKeeper(host + ":" + port, timeout, this);
+ }
+
+ public String getName(String uri) {
+ String name = uri.replace("$", "$$");
+ return name.replace('/', '$');
+ }
+
+ private byte[] getBytes(String str) {
+ try {
+ return str.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public String join(ZooKeeper zooKeeper, String domainURI, String nodeURI) throws KeeperException,
+ InterruptedException {
+ String domain = "/" + getName(domainURI);
+ Stat stat = zooKeeper.exists(domain, false);
+ String path = domain;
+ if (stat == null) {
+ path = zooKeeper.create(domain, getBytes(domainURI), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ path = path + "/" + getName(nodeURI);
+ stat = zooKeeper.exists(path, false);
+ if (stat != null) {
+ return zooKeeper.create(path, getBytes(nodeURI), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ } else {
+ return path;
+ }
+ }
+
+ public String getPath(String... uris) {
+ StringBuffer buffer = new StringBuffer();
+ for (String uri : uris) {
+ buffer.append('/').append(getName(uri));
+ }
+ return buffer.toString();
+ }
+
+ public void put(ZooKeeper zooKeeper, String domainURI, String nodeURI, String endpointURI, byte[] endpoint)
+ throws KeeperException, InterruptedException {
+ String path = join(zooKeeper, domainURI, nodeURI);
+ path = path + "/" + getName(endpointURI);
+ Stat stat = zooKeeper.exists(path, false);
+ if (stat == null) {
+ zooKeeper.create(path, endpoint, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ } else {
+ zooKeeper.setData(path, endpoint, -1);
+ }
+ }
+
+ public void remove(ZooKeeper zooKeeper, String domainURI, String nodeURI, String endpointURI)
+ throws KeeperException, InterruptedException {
+ String path = getPath(domainURI, nodeURI, endpointURI);
+ zooKeeper.delete(path, -1);
+ }
+
+ public void remove(ZooKeeper zooKeeper, String domainURI, String nodeURI) throws InterruptedException,
+ KeeperException {
+ String node = getPath(domainURI, nodeURI);
+ zooKeeper.delete(node, -1);
+ }
+
+ public byte[] get(ZooKeeper zooKeeper, String domainURI, String nodeURI, String endpointURI)
+ throws KeeperException, InterruptedException {
+ String path = getPath(domainURI, nodeURI, endpointURI);
+ return zooKeeper.getData(path, false, new Stat());
+ }
+
+ public List<byte[]> get(ZooKeeper zooKeeper, String domainURI, String nodeURI) throws KeeperException,
+ InterruptedException {
+ String node = getPath(domainURI, nodeURI);
+ List<String> endpoints = zooKeeper.getChildren(node, false);
+ List<byte[]> data = new ArrayList<byte[]>();
+ for (String endpoint : endpoints) {
+ Stat stat = new Stat();
+ data.add(zooKeeper.getData(endpoint, false, stat));
+ }
+ return data;
+ }
+
+ public List<byte[]> get(ZooKeeper zooKeeper, String domainURI) throws KeeperException, InterruptedException {
+ String path = getPath(domainURI);
+ List<String> nodes = zooKeeper.getChildren(path, false);
+ List<byte[]> data = new ArrayList<byte[]>();
+ for (String node : nodes) {
+ List<String> endpoints = zooKeeper.getChildren(node, false);
+ for (String endpoint : endpoints) {
+ Stat stat = new Stat();
+ data.add(zooKeeper.getData(endpoint, false, stat));
+ }
+ }
+ return data;
+ }
+
+ public ZooKeeper create(String connectString, int timeout, long sessionId, byte[] password) throws IOException {
+ return new ZooKeeper(connectString, timeout, this, sessionId, password);
+ }
+
+ public void process(WatchedEvent event) {
+ System.out.println(event);
+ }
+
+ public ZooKeeperServer createServer(File dataDir, File snapDir, int tickTime) throws IOException {
+ FileTxnSnapLog log = new FileTxnSnapLog(dataDir, snapDir);
+ DataTreeBuilder builder = new ZooKeeperServer.BasicDataTreeBuilder();
+ return new ZooKeeperServer(log, tickTime, builder);
+ }
+
+ public void put(ZooKeeper zooKeeper, String key, Object value) throws KeeperException, InterruptedException {
+ byte[] data = serialize(value);
+ List<ACL> acls = Collections.emptyList();
+ zooKeeper.create(key, data, acls, CreateMode.PERSISTENT);
+ }
+
+ public byte[] serialize(Object value) {
+ return null;
+ }
+
+ private Map<String, String> parseURI(Map<String, String> attributes, String domainRegistryURI) {
+ Map<String, String> map = new HashMap<String, String>();
+ if (attributes != null) {
+ map.putAll(attributes);
+ }
+ URI uri = URI.create(domainRegistryURI);
+ if (uri.getHost() != null) {
+ map.put("host", uri.getHost());
+ }
+ if (uri.getPort() != -1) {
+ map.put("port", String.valueOf(uri.getPort()));
+ }
+ int index = domainRegistryURI.indexOf('?');
+ if (index == -1) {
+ return map;
+ }
+ String query = domainRegistryURI.substring(index + 1);
+ try {
+ query = URLDecoder.decode(query, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalArgumentException(e);
+ }
+ String[] params = query.split("&");
+ for (String param : params) {
+ index = param.indexOf('=');
+ if (index != -1) {
+ map.put(param.substring(0, index), param.substring(index + 1));
+ }
+ }
+ return map;
+ }
+
+ public static void main(final String[] args) throws Exception {
+ final String options[] =
+ args.length != 0 ? args
+ : new String[] {"9999", System.getProperty("java.io.tmpdir") + File.separator + "zookeeper"};
+ Thread thread = new Thread() {
+ public void run() {
+ ZooKeeperServerMain.main(options);
+ }
+ };
+ thread.start();
+ Thread.sleep(1000);
+ ZooKeeper zooKeeper = new ZooKeeperHelper().connect("localhost", 9999, 500);
+ System.out.println(zooKeeper.getSessionId());
+ try {
+ String data = new String(zooKeeper.getData("/x", false, null));
+ System.out.println(data);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ zooKeeper.create("/x", "X".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ }
+ zooKeeper.create("/x/y", "XY".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ Thread.sleep(1000);
+ String data = new String(zooKeeper.getData("/x/y", true, null));
+ System.out.println(data);
+ zooKeeper.close();
+ Thread.sleep(500);
+ System.exit(0);
+ }
+}
diff --git a/java/sca/modules/endpoint-zookeeper/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry b/java/sca/modules/endpoint-zookeeper/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry
new file mode 100644
index 0000000000..c17d8c35c5
--- /dev/null
+++ b/java/sca/modules/endpoint-zookeeper/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+org.apache.tuscany.sca.endpoint.zookeeper.DistributedRegistry;scheme=zookeeper