summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java
diff options
context:
space:
mode:
Diffstat (limited to 'sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java')
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java297
1 files changed, 297 insertions, 0 deletions
diff --git a/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java b/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java
new file mode 100644
index 0000000000..d5b8e74631
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/endpoint-zookeeper/src/main/java/org/apache/tuscany/sca/endpoint/zookeeper/AbstractDistributedMap.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.oasisopen.sca.ServiceRuntimeException;
+
+/**
+ *
+ */
+public class AbstractDistributedMap<V> extends AbstractMap<String, V> implements Map<String, V>, Watcher {
+ protected ZooKeeper zooKeeper;
+ protected ClassLoader classLoader;
+ protected String root;
+
+ /**
+ * @param zooKeeper
+ * @param root
+ * @param classLoader
+ */
+ public AbstractDistributedMap(ZooKeeper zooKeeper, String root, ClassLoader classLoader) {
+ super();
+ this.zooKeeper = zooKeeper;
+ this.root = root;
+ this.classLoader = classLoader;
+ }
+
+ public void start() {
+ // FIXME:
+ this.zooKeeper.register(this);
+ try {
+ String path = getPath(root);
+ Stat stat = zooKeeper.exists(path, false);
+ if (stat == null) {
+ zooKeeper.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public Set<Map.Entry<String, V>> entrySet() {
+ String path = getPath(root);
+ List<String> children = Collections.emptyList();
+ try {
+ try {
+ children = zooKeeper.getChildren(path, false);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return Collections.emptySet();
+ } else {
+ throw e;
+ }
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+ return new EntrySet(children);
+ }
+
+ protected String getName(Object key) {
+ String name = String.valueOf(key);
+ name = name.replace("$", "$$");
+ return name.replace('/', '$');
+ }
+
+ public String getPath(String... uris) {
+ StringBuffer buffer = new StringBuffer();
+ for (String uri : uris) {
+ buffer.append('/').append(getName(uri));
+ }
+ return buffer.toString();
+ }
+
+ protected byte[] serialize(V value) throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(value);
+ oos.close();
+ return bos.toByteArray();
+ }
+
+ protected V deserialize(byte[] bytes, final ClassLoader classLoader) throws IOException, ClassNotFoundException {
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInputStream ois = new ObjectInputStream(bis) {
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+ ClassLoader loader = classLoader;
+ if (loader == null) {
+ loader = Thread.currentThread().getContextClassLoader();
+ }
+ try {
+ return Class.forName(desc.getName(), false, loader);
+ } catch (ClassNotFoundException e) {
+ return super.resolveClass(desc);
+ }
+ }
+ };
+ return (V)ois.readObject();
+ }
+
+ @Override
+ public V get(Object key) {
+ String path = getPath(root, getName(key));
+ return getData(path);
+ }
+
+ protected V getData(String path) {
+ try {
+ Stat stat = new Stat();
+ byte[] data = zooKeeper.getData(path, false, stat);
+ return deserialize(data, classLoader);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return null;
+ } else {
+ throw new ServiceRuntimeException(e);
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public V put(String key, V value) {
+ try {
+ String path = getPath(root, getName(key));
+ Stat stat = new Stat();
+ byte[] data = serialize(value);
+
+ try {
+ byte[] oldData = zooKeeper.getData(path, false, stat);
+ zooKeeper.setData(path, data, -1);
+ return deserialize(oldData, classLoader);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ zooKeeper.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public V remove(Object key) {
+ try {
+ String path = getPath(root, getName(key));
+ try {
+ Stat stat = new Stat();
+ byte[] oldData = zooKeeper.getData(path, false, stat);
+ zooKeeper.delete(path, -1);
+ return deserialize(oldData, classLoader);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+
+ }
+
+ private class MapEntry implements Map.Entry<String, V> {
+ private String key;
+
+ /**
+ * @param key
+ */
+ public MapEntry(String key) {
+ super();
+ this.key = key;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ try {
+ try {
+ byte[] data = zooKeeper.getData(getPath(root, getName(key)), false, new Stat());
+ return deserialize(data, classLoader);
+ } catch (KeeperException e) {
+ if (e.code() == Code.NONODE) {
+ return null;
+ } else {
+ throw e;
+ }
+ }
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+
+ public V setValue(V value) {
+ return put(key, value);
+ }
+ }
+
+ private class EntrySet extends AbstractSet<Map.Entry<String, V>> {
+ private final int size;
+ private final Iterator<String> childrenIterator;
+
+ /**
+ * @param size
+ * @param childrenIterator
+ */
+ public EntrySet(Collection<String> children) {
+ super();
+ this.size = children.size();
+ this.childrenIterator = children.iterator();
+ }
+
+ @Override
+ public Iterator<Map.Entry<String, V>> iterator() {
+ return new Iterator<Map.Entry<String, V>>() {
+ private String path;
+
+ public boolean hasNext() {
+ return childrenIterator.hasNext();
+ }
+
+ public Map.Entry<String, V> next() {
+ path = childrenIterator.next();
+ return new MapEntry(path);
+ }
+
+ public void remove() {
+ childrenIterator.remove();
+ try {
+ zooKeeper.delete(getPath(root, path), -1);
+ } catch (Throwable e) {
+ throw new ServiceRuntimeException(e);
+ }
+ }
+ };
+
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+ }
+
+ public void process(WatchedEvent event) {
+ System.out.println(event);
+ }
+}