From a3c48da9bb8971497d414f86e352123d95b9c3da Mon Sep 17 00:00:00 2001 From: lresende Date: Fri, 20 Nov 2009 23:53:35 +0000 Subject: Moving 2.x trunk git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@882795 13f79535-47bb-0310-9956-ffa450edef68 --- .../endpoint/zookeeper/AbstractDistributedMap.java | 297 +++++++++++++++++++++ .../endpoint/zookeeper/DistributedRegistry.java | 285 ++++++++++++++++++++ .../endpoint/zookeeper/LocalZooKeeperServer.java | 148 ++++++++++ .../sca/endpoint/zookeeper/ZooKeeperHelper.java | 200 ++++++++++++++ 4 files changed, 930 insertions(+) create mode 100644 sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java create mode 100644 sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java create mode 100644 sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/LocalZooKeeperServer.java create mode 100644 sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java (limited to 'sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca') diff --git a/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java b/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java new file mode 100644 index 0000000000..d5b8e74631 --- /dev/null +++ b/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.zookeeper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; +import java.util.AbstractMap; +import java.util.AbstractSet; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * + */ +public class AbstractDistributedMap extends AbstractMap implements Map, Watcher { + protected ZooKeeper zooKeeper; + protected ClassLoader classLoader; + protected String root; + + /** + * @param zooKeeper + * @param root + * @param classLoader + */ + public AbstractDistributedMap(ZooKeeper zooKeeper, String root, ClassLoader classLoader) { + super(); + this.zooKeeper = zooKeeper; + this.root = root; + this.classLoader = classLoader; + } + + public void start() { + // FIXME: + this.zooKeeper.register(this); + try { + String path = getPath(root); + Stat stat = zooKeeper.exists(path, false); + if (stat == null) { + zooKeeper.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } catch (Exception e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public Set> entrySet() { + String path = getPath(root); + List children = Collections.emptyList(); + try { + try { + children = zooKeeper.getChildren(path, false); + } catch (KeeperException e) { + if (e.code() == Code.NONODE) { + return Collections.emptySet(); + } else { + throw e; + } + } + } catch (Throwable e) { + throw new ServiceRuntimeException(e); + } + return new EntrySet(children); + } + + protected String getName(Object key) { + String name = String.valueOf(key); + name = name.replace("$", "$$"); + return name.replace('/', '$'); + } + + public String getPath(String... uris) { + StringBuffer buffer = new StringBuffer(); + for (String uri : uris) { + buffer.append('/').append(getName(uri)); + } + return buffer.toString(); + } + + protected byte[] serialize(V value) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(value); + oos.close(); + return bos.toByteArray(); + } + + protected V deserialize(byte[] bytes, final ClassLoader classLoader) throws IOException, ClassNotFoundException { + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream ois = new ObjectInputStream(bis) { + protected Class resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { + ClassLoader loader = classLoader; + if (loader == null) { + loader = Thread.currentThread().getContextClassLoader(); + } + try { + return Class.forName(desc.getName(), false, loader); + } catch (ClassNotFoundException e) { + return super.resolveClass(desc); + } + } + }; + return (V)ois.readObject(); + } + + @Override + public V get(Object key) { + String path = getPath(root, getName(key)); + return getData(path); + } + + protected V getData(String path) { + try { + Stat stat = new Stat(); + byte[] data = zooKeeper.getData(path, false, stat); + return deserialize(data, classLoader); + } catch (KeeperException e) { + if (e.code() == Code.NONODE) { + return null; + } else { + throw new ServiceRuntimeException(e); + } + } catch (Throwable e) { + throw new ServiceRuntimeException(e); + } + + } + + @Override + public V put(String key, V value) { + try { + String path = getPath(root, getName(key)); + Stat stat = new Stat(); + byte[] data = serialize(value); + + try { + byte[] oldData = zooKeeper.getData(path, false, stat); + zooKeeper.setData(path, data, -1); + return deserialize(oldData, classLoader); + } catch (KeeperException e) { + if (e.code() == Code.NONODE) { + zooKeeper.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + return null; + } else { + throw e; + } + } + } catch (Throwable e) { + throw new ServiceRuntimeException(e); + } + + } + + @Override + public V remove(Object key) { + try { + String path = getPath(root, getName(key)); + try { + Stat stat = new Stat(); + byte[] oldData = zooKeeper.getData(path, false, stat); + zooKeeper.delete(path, -1); + return deserialize(oldData, classLoader); + } catch (KeeperException e) { + if (e.code() == Code.NONODE) { + return null; + } else { + throw e; + } + } + } catch (Throwable e) { + throw new ServiceRuntimeException(e); + } + + } + + private class MapEntry implements Map.Entry { + private String key; + + /** + * @param key + */ + public MapEntry(String key) { + super(); + this.key = key; + } + + public String getKey() { + return key; + } + + public V getValue() { + try { + try { + byte[] data = zooKeeper.getData(getPath(root, getName(key)), false, new Stat()); + return deserialize(data, classLoader); + } catch (KeeperException e) { + if (e.code() == Code.NONODE) { + return null; + } else { + throw e; + } + } + } catch (Throwable e) { + throw new ServiceRuntimeException(e); + } + } + + public V setValue(V value) { + return put(key, value); + } + } + + private class EntrySet extends AbstractSet> { + private final int size; + private final Iterator childrenIterator; + + /** + * @param size + * @param childrenIterator + */ + public EntrySet(Collection children) { + super(); + this.size = children.size(); + this.childrenIterator = children.iterator(); + } + + @Override + public Iterator> iterator() { + return new Iterator>() { + private String path; + + public boolean hasNext() { + return childrenIterator.hasNext(); + } + + public Map.Entry next() { + path = childrenIterator.next(); + return new MapEntry(path); + } + + public void remove() { + childrenIterator.remove(); + try { + zooKeeper.delete(getPath(root, path), -1); + } catch (Throwable e) { + throw new ServiceRuntimeException(e); + } + } + }; + + } + + @Override + public int size() { + return size; + } + } + + public void process(WatchedEvent event) { + System.out.println(event); + } +} diff --git a/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java b/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java new file mode 100644 index 0000000000..b6ec34fe1f --- /dev/null +++ b/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/DistributedRegistry.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.zookeeper; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.logging.Logger; + +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.runtime.EndpointListener; +import org.apache.tuscany.sca.runtime.EndpointRegistry; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.ZooKeeper; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * + */ +public class DistributedRegistry extends AbstractDistributedMap implements EndpointRegistry, + LifeCycleListener { + + private static final Logger logger = Logger.getLogger(DistributedRegistry.class.getName()); + private List listeners = new CopyOnWriteArrayList(); + private List endpointreferences = new CopyOnWriteArrayList(); + + private ExtensionPointRegistry registry; + private String domainURI; + private String registryURI; + + /** + * + */ + public DistributedRegistry(ExtensionPointRegistry registry, + Map attributes, + String domainRegistryURI, + String domainURI) { + super(null, null, null); + this.domainURI = domainURI; + this.registryURI = domainRegistryURI; + Map config = parseURI(attributes, domainRegistryURI); + } + + public void start() { + try { + zooKeeper = new ZooKeeper(registryURI, 100, null); + } catch (IOException e) { + throw new ServiceRuntimeException(e); + } + } + + public void stop() { + if (zooKeeper != null) { + try { + zooKeeper.close(); + } catch (InterruptedException e) { + throw new ServiceRuntimeException(e); + } + zooKeeper = null; + } + } + + private Map parseURI(Map attributes, String domainRegistryURI) { + Map map = new HashMap(); + if (attributes != null) { + map.putAll(attributes); + } + // Should be zookeeper:host1:port1,host2:port2?sessionTimeout=100 + int index = domainRegistryURI.indexOf(':'); + String path = domainRegistryURI.substring(index + 1); + + index = path.indexOf('?'); + if (index == -1) { + map.put("hosts", path); + return map; + } + map.put("hosts", path.substring(0, index)); + String query = path.substring(index + 1); + try { + query = URLDecoder.decode(query, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new IllegalArgumentException(e); + } + String[] params = query.split("&"); + for (String param : params) { + index = param.indexOf('='); + if (index != -1) { + map.put(param.substring(0, index), param.substring(index + 1)); + } + } + return map; + } + + public void addEndpoint(Endpoint endpoint) { + put(endpoint.getURI(), endpoint); + logger.info("Add endpoint - " + endpoint); + } + + public void addEndpointReference(EndpointReference endpointReference) { + endpointreferences.add(endpointReference); + logger.fine("Add endpoint reference - " + endpointReference); + } + + public void addListener(EndpointListener listener) { + listeners.add(listener); + } + + /** + * Parse the component/service/binding URI into an array of parts (componentURI, serviceName, bindingName) + * @param uri + * @return + */ + private String[] parse(String uri) { + String[] names = new String[3]; + int index = uri.lastIndexOf('#'); + if (index == -1) { + names[0] = uri; + } else { + names[0] = uri.substring(0, index); + String str = uri.substring(index + 1); + if (str.startsWith("service-binding(") && str.endsWith(")")) { + str = str.substring("service-binding(".length(), str.length() - 1); + String[] parts = str.split("/"); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid service-binding URI: " + uri); + } + names[1] = parts[0]; + names[2] = parts[1]; + } else if (str.startsWith("service(") && str.endsWith(")")) { + str = str.substring("service(".length(), str.length() - 1); + names[1] = str; + } else { + throw new IllegalArgumentException("Invalid component/service/binding URI: " + uri); + } + } + return names; + } + + private boolean matches(String target, String uri) { + String[] parts1 = parse(target); + String[] parts2 = parse(uri); + for (int i = 0; i < parts1.length; i++) { + if (parts1[i] == null || parts1[i].equals(parts2[i])) { + continue; + } else { + return false; + } + } + return true; + } + + public List findEndpoint(EndpointReference endpointReference) { + List foundEndpoints = new ArrayList(); + + logger.fine("Find endpoint for reference - " + endpointReference); + + if (endpointReference.getReference() != null) { + Endpoint targetEndpoint = endpointReference.getTargetEndpoint(); + for (Object v : values()) { + Endpoint endpoint = (Endpoint)v; + // TODO: implement more complete matching + logger.fine("Matching against - " + endpoint); + if (matches(targetEndpoint.getURI(), endpoint.getURI())) { + // if (!entry.isPrimary()) { + ((RuntimeEndpoint) endpoint).bind(registry, this); + // } + foundEndpoints.add(endpoint); + logger.fine("Found endpoint with matching service - " + endpoint); + } + // else the service name doesn't match + } + } + return foundEndpoints; + } + + public List findEndpointReference(Endpoint endpoint) { + return endpointreferences; + } + + public Endpoint getEndpoint(String uri) { + return get(uri); + } + + public List getEndpointRefereneces() { + return endpointreferences; + } + + public List getEndpoints() { + return new ArrayList(values()); + } + + public List getListeners() { + return listeners; + } + + public void removeEndpoint(Endpoint endpoint) { + remove(endpoint.getURI()); + logger.info("Remove endpoint - " + endpoint); + } + + public void removeEndpointReference(EndpointReference endpointReference) { + endpointreferences.remove(endpointReference); + logger.fine("Remove endpoint reference - " + endpointReference); + } + + public void removeListener(EndpointListener listener) { + listeners.remove(listener); + } + + public void updateEndpoint(String uri, Endpoint endpoint) { + Endpoint oldEndpoint = getEndpoint(uri); + if (oldEndpoint == null) { + throw new IllegalArgumentException("Endpoint is not found: " + uri); + } + put(endpoint.getURI(), endpoint); + } + + public void entryAdded(Endpoint value) { + ((RuntimeEndpoint) value).bind(registry, this); + for (EndpointListener listener : listeners) { + listener.endpointAdded(value); + } + } + + public void entryRemoved(Endpoint value) { + for (EndpointListener listener : listeners) { + listener.endpointRemoved(value); + } + } + + public void entryUpdated(Endpoint oldEp, Endpoint newEp) { + ((RuntimeEndpoint) newEp).bind(registry, this); + for (EndpointListener listener : listeners) { + listener.endpointUpdated(oldEp, newEp); + } + } + + @Override + public void process(WatchedEvent event) { + super.process(event); + String path = event.getPath(); + if (path == null || !path.startsWith(getPath(root))) { + return; + } + switch (event.getType()) { + case NodeChildrenChanged: + break; + case NodeCreated: + case NodeDataChanged: + Endpoint ep = getData(path); + entryAdded(ep); + break; + case NodeDeleted: + entryRemoved(null); + break; + } + } + +} diff --git a/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/LocalZooKeeperServer.java b/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/LocalZooKeeperServer.java new file mode 100644 index 0000000000..49bb46f983 --- /dev/null +++ b/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/LocalZooKeeperServer.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.zookeeper; + +import java.io.File; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.zookeeper.server.NIOServerCnxn; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; + +/** + * This class starts and runs a standalone ZooKeeperServer. + */ +public class LocalZooKeeperServer { + private static final Logger logger = Logger.getLogger(LocalZooKeeperServer.class.getName()); + private static final String USAGE = "Usage: LocalZooKeeperServer configfile | port datadir [ticktime] [maxcnxns]"; + private NIOServerCnxn.Factory cnxnFactory; + private ZooKeeperServer zkServer; + + /** + * + */ + public LocalZooKeeperServer() { + super(); + } + + public Thread folk(final ServerConfig config) { + Thread thread = new Thread() { + public void run() { + try { + LocalZooKeeperServer.this.run(config); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + }; + thread.start(); + return thread; + } + + public Thread folk(final String[] args) { + Thread thread = new Thread() { + public void run() { + try { + LocalZooKeeperServer.this.run(args); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + }; + thread.start(); + return thread; + } + + /** + * Run from a ServerConfig. + * @param config ServerConfig to use. + * @throws IOException + */ + public void run(ServerConfig config) throws IOException { + logger.info("Starting ZooKeeper server"); + try { + // Note that this thread isn't going to be doing anything else, + // so rather than spawning another thread, we will just call + // run() in this thread. + // create a file logger url from the command line args + zkServer = new ZooKeeperServer(); + + FileTxnSnapLog ftxn = new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir())); + zkServer.setTxnLogFactory(ftxn); + zkServer.setTickTime(config.getTickTime()); + cnxnFactory = new NIOServerCnxn.Factory(config.getClientPort(), config.getMaxClientCnxns()); + cnxnFactory.startup(zkServer); + cnxnFactory.join(); + if (zkServer.isRunning()) { + zkServer.shutdown(); + } + } catch (InterruptedException e) { + // warn, but generally this is ok + logger.log(Level.WARNING, "Server interrupted", e); + } + } + + public void run(String[] args) throws ConfigException, IOException { + ServerConfig config = new ServerConfig(); + if (args.length == 1) { + config.parse(args[0]); + } else { + config.parse(args); + } + run(config); + } + + /** + * Shutdown the serving instance + */ + public void shutdown() { + cnxnFactory.shutdown(); + } + + /* + * Start up the ZooKeeper server. + * + * @param args the configfile or the port datadir [ticktime] + */ + public static void main(String[] args) { + LocalZooKeeperServer main = new LocalZooKeeperServer(); + try { + main.run(args); + } catch (IllegalArgumentException e) { + logger.log(Level.SEVERE, "Invalid arguments, exiting abnormally", e); + logger.info(USAGE); + System.err.println(USAGE); + System.exit(2); + } catch (ConfigException e) { + logger.log(Level.SEVERE, "Invalid config, exiting abnormally", e); + System.err.println("Invalid config, exiting abnormally"); + System.exit(2); + } catch (Exception e) { + logger.log(Level.SEVERE, "Unexpected exception, exiting abnormally", e); + System.exit(1); + } + logger.info("Exiting normally"); + System.exit(0); + } +} diff --git a/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java b/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java new file mode 100644 index 0000000000..e92df3d91d --- /dev/null +++ b/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/ZooKeeperHelper.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.zookeeper; + +import java.io.File; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.ZooKeeperServer.DataTreeBuilder; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; + +/** + * + */ +public class ZooKeeperHelper implements Watcher { + public ZooKeeper connect(String host, int port, int timeout) throws IOException { + return new ZooKeeper(host + ":" + port, timeout, this); + } + + public String getName(String uri) { + String name = uri.replace("$", "$$"); + return name.replace('/', '$'); + } + + private byte[] getBytes(String str) { + try { + return str.getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new IllegalArgumentException(e); + } + } + + public String join(ZooKeeper zooKeeper, String domainURI, String nodeURI) throws KeeperException, + InterruptedException { + String domain = "/" + getName(domainURI); + Stat stat = zooKeeper.exists(domain, false); + String path = domain; + if (stat == null) { + path = zooKeeper.create(domain, getBytes(domainURI), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + path = path + "/" + getName(nodeURI); + stat = zooKeeper.exists(path, false); + if (stat != null) { + return zooKeeper.create(path, getBytes(nodeURI), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } else { + return path; + } + } + + public String getPath(String... uris) { + StringBuffer buffer = new StringBuffer(); + for (String uri : uris) { + buffer.append('/').append(getName(uri)); + } + return buffer.toString(); + } + + public void put(ZooKeeper zooKeeper, String domainURI, String nodeURI, String endpointURI, byte[] endpoint) + throws KeeperException, InterruptedException { + String path = join(zooKeeper, domainURI, nodeURI); + path = path + "/" + getName(endpointURI); + Stat stat = zooKeeper.exists(path, false); + if (stat == null) { + zooKeeper.create(path, endpoint, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } else { + zooKeeper.setData(path, endpoint, -1); + } + } + + public void remove(ZooKeeper zooKeeper, String domainURI, String nodeURI, String endpointURI) + throws KeeperException, InterruptedException { + String path = getPath(domainURI, nodeURI, endpointURI); + zooKeeper.delete(path, -1); + } + + public void remove(ZooKeeper zooKeeper, String domainURI, String nodeURI) throws InterruptedException, + KeeperException { + String node = getPath(domainURI, nodeURI); + zooKeeper.delete(node, -1); + } + + public byte[] get(ZooKeeper zooKeeper, String domainURI, String nodeURI, String endpointURI) + throws KeeperException, InterruptedException { + String path = getPath(domainURI, nodeURI, endpointURI); + return zooKeeper.getData(path, false, new Stat()); + } + + public List get(ZooKeeper zooKeeper, String domainURI, String nodeURI) throws KeeperException, + InterruptedException { + String node = getPath(domainURI, nodeURI); + List endpoints = zooKeeper.getChildren(node, false); + List data = new ArrayList(); + for (String endpoint : endpoints) { + Stat stat = new Stat(); + data.add(zooKeeper.getData(endpoint, false, stat)); + } + return data; + } + + public List get(ZooKeeper zooKeeper, String domainURI) throws KeeperException, InterruptedException { + String path = getPath(domainURI); + List nodes = zooKeeper.getChildren(path, false); + List data = new ArrayList(); + for (String node : nodes) { + List endpoints = zooKeeper.getChildren(node, false); + for (String endpoint : endpoints) { + Stat stat = new Stat(); + data.add(zooKeeper.getData(endpoint, false, stat)); + } + } + return data; + } + + public ZooKeeper create(String connectString, int timeout, long sessionId, byte[] password) throws IOException { + return new ZooKeeper(connectString, timeout, this, sessionId, password); + } + + public void process(WatchedEvent event) { + System.out.println(event); + } + + public ZooKeeperServer createServer(File dataDir, File snapDir, int tickTime) throws IOException { + FileTxnSnapLog log = new FileTxnSnapLog(dataDir, snapDir); + DataTreeBuilder builder = new ZooKeeperServer.BasicDataTreeBuilder(); + return new ZooKeeperServer(log, tickTime, builder); + } + + public void put(ZooKeeper zooKeeper, String key, Object value) throws KeeperException, InterruptedException { + byte[] data = serialize(value); + List acls = Collections.emptyList(); + zooKeeper.create(key, data, acls, CreateMode.PERSISTENT); + } + + public byte[] serialize(Object value) { + return null; + } + + + + public static void main(final String[] args) throws Exception { + final String options[] = + args.length != 0 ? args + : new String[] {"9999", System.getProperty("java.io.tmpdir") + File.separator + "zookeeper"}; + Thread thread = new Thread() { + public void run() { + ZooKeeperServerMain.main(options); + } + }; + thread.start(); + Thread.sleep(1000); + ZooKeeper zooKeeper = new ZooKeeperHelper().connect("localhost", 9999, 500); + System.out.println(zooKeeper.getSessionId()); + try { + String data = new String(zooKeeper.getData("/x", false, null)); + System.out.println(data); + } catch (KeeperException e) { + if (e.code() == Code.NONODE) { + zooKeeper.create("/x", "X".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + } + zooKeeper.create("/x/y", "XY".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + Thread.sleep(1000); + String data = new String(zooKeeper.getData("/x/y", true, null)); + System.out.println(data); + zooKeeper.close(); + Thread.sleep(500); + System.exit(0); + } +} -- cgit v1.2.3