summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java1564
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java41
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java83
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java420
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java163
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java49
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory17
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java170
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java85
9 files changed, 2592 insertions, 0 deletions
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
new file mode 100644
index 0000000000..fe683af025
--- /dev/null
+++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
@@ -0,0 +1,1564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.tribes;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Heartbeat;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.Response;
+import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.io.XByteBuffer;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.tipis.ReplicatedMapEntry;
+import org.apache.catalina.tribes.util.Arrays;
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+
+/**
+ * This file is copied from:
+ * https://svn.apache.org/repos/asf/tomcat/tc6.0.x/tags/TOMCAT_6_0_20/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
+ *
+ * We need to intercept the put/remove calls on the base Map to fire events
+ */
+public abstract class AbstractReplicatedMap extends MapStore implements RpcCallback, ChannelListener,
+ MembershipListener, Heartbeat {
+ private static final long serialVersionUID = -2703358020113899074L;
+
+ protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);
+
+ /**
+ * The default initial capacity - MUST be a power of two.
+ */
+ public static final int DEFAULT_INITIAL_CAPACITY = 16;
+
+ /**
+ * The load factor used when none specified in constructor.
+ **/
+ public static final float DEFAULT_LOAD_FACTOR = 0.75f;
+
+ /**
+ * Used to identify the map
+ */
+ final String chset = "ISO-8859-1";
+
+ //------------------------------------------------------------------------------
+ // INSTANCE VARIABLES
+ //------------------------------------------------------------------------------
+ protected abstract int getStateMessageType();
+
+ /**
+ * Timeout for RPC messages, how long we will wait for a reply
+ */
+ protected transient long rpcTimeout = 5000;
+ /**
+ * Reference to the channel for sending messages
+ */
+ protected transient Channel channel;
+ /**
+ * The RpcChannel to send RPC messages through
+ */
+ protected transient RpcChannel rpcChannel;
+ /**
+ * The Map context name makes this map unique, this
+ * allows us to have more than one map shared
+ * through one channel
+ */
+ protected transient byte[] mapContextName;
+ /**
+ * Has the state been transferred
+ */
+ protected transient boolean stateTransferred = false;
+ /**
+ * Simple lock object for transfers
+ */
+ protected transient Object stateMutex = new Object();
+ /**
+ * A list of members in our map
+ */
+ protected transient Map<Member, Long> mapMembers = new HashMap<Member, Long>();
+ /**
+ * Our default send options
+ */
+ protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
+ /**
+ * The owner of this map, ala a SessionManager for example
+ */
+ protected transient MapOwner mapOwner;
+ /**
+ * External class loaders if serialization and deserialization is to be performed successfully.
+ */
+ protected transient ClassLoader[] externalLoaders;
+
+ /**
+ * The node we are currently backing up data to, this index will rotate
+ * on a round robin basis
+ */
+ protected transient int currentNode = 0;
+
+ /**
+ * Since the map keeps internal membership
+ * this is the timeout for a ping message to be responded to
+ * If a remote map doesn't respond within this timeframe,
+ * its considered dead.
+ */
+ protected transient long accessTimeout = 5000;
+
+ /**
+ * Readable string of the mapContextName value
+ */
+ protected transient String mapname = "";
+
+ //------------------------------------------------------------------------------
+ // map owner interface
+ //------------------------------------------------------------------------------
+
+ public static interface MapOwner {
+ public void objectMadePrimay(Object key, Object value);
+ }
+
+ //------------------------------------------------------------------------------
+ // CONSTRUCTORS
+ //------------------------------------------------------------------------------
+
+ /**
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ * @param initialCapacity int - the size of this map, see HashMap
+ * @param loadFactor float - load factor, see HashMap
+ * @param cls - a list of classloaders to be used for deserialization of objects.
+ */
+ public AbstractReplicatedMap(MapOwner owner,
+ Channel channel,
+ long timeout,
+ String mapContextName,
+ int initialCapacity,
+ float loadFactor,
+ int channelSendOptions,
+ ClassLoader[] cls) {
+ super(initialCapacity, loadFactor, 15);
+ init(owner, channel, mapContextName, timeout, channelSendOptions, cls);
+
+ }
+
+ /**
+ * Helper methods, wraps a single member in an array
+ * @param m Member
+ * @return Member[]
+ */
+ protected Member[] wrap(Member m) {
+ if (m == null)
+ return new Member[0];
+ else
+ return new Member[] {m};
+ }
+
+ /**
+ * Initializes the map by creating the RPC channel, registering itself as a channel listener
+ * This method is also responsible for initiating the state transfer
+ * @param owner Object
+ * @param channel Channel
+ * @param mapContextName String
+ * @param timeout long
+ * @param channelSendOptions int
+ * @param cls ClassLoader[]
+ */
+ protected void init(MapOwner owner,
+ Channel channel,
+ String mapContextName,
+ long timeout,
+ int channelSendOptions,
+ ClassLoader[] cls) {
+ log.info("Initializing AbstractReplicatedMap with context name:" + mapContextName);
+ this.mapOwner = owner;
+ this.externalLoaders = cls;
+ this.channelSendOptions = channelSendOptions;
+ this.channel = channel;
+ this.rpcTimeout = timeout;
+
+ try {
+ this.mapname = mapContextName;
+ //unique context is more efficient if it is stored as bytes
+ this.mapContextName = mapContextName.getBytes(chset);
+ } catch (UnsupportedEncodingException x) {
+ log.warn("Unable to encode mapContextName[" + mapContextName
+ + "] using getBytes("
+ + chset
+ + ") using default getBytes()", x);
+ this.mapContextName = mapContextName.getBytes();
+ }
+ if (log.isTraceEnabled())
+ log.trace("Created Lazy Map with name:" + mapContextName
+ + ", bytes:"
+ + Arrays.toString(this.mapContextName));
+
+ //create an rpc channel and add the map as a listener
+ this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
+ //add this map as a message listener
+ this.channel.addChannelListener(this);
+ //listen for membership notifications
+ this.channel.addMembershipListener(this);
+
+ try {
+ //broadcast our map, this just notifies other members of our existence
+ broadcast(MapMessage.MSG_INIT, true);
+ //transfer state from another map
+ transferState();
+ //state is transferred, we are ready for messaging
+ broadcast(MapMessage.MSG_START, true);
+ } catch (ChannelException x) {
+ log.warn("Unable to send map start message.");
+ throw new RuntimeException("Unable to start replicated map.", x);
+ }
+ }
+
+ /**
+ * Sends a ping out to all the members in the cluster, not just map members
+ * that this map is alive.
+ * @param timeout long
+ * @throws ChannelException
+ */
+ protected void ping(long timeout) throws ChannelException {
+ //send out a map membership message, only wait for the first reply
+ MapMessage msg =
+ new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel
+ .getLocalMember(false), null);
+ if (channel.getMembers().length > 0) {
+ //send a ping, wait for all nodes to reply
+ Response[] resp =
+ rpcChannel.send(channel.getMembers(),
+ msg,
+ RpcChannel.ALL_REPLY,
+ (channelSendOptions),
+ (int)accessTimeout);
+ for (int i = 0; i < resp.length; i++) {
+ memberAlive(resp[i].getSource());
+ } //for
+ }
+ //update our map of members, expire some if we didn't receive a ping back
+ synchronized (mapMembers) {
+ Iterator it = mapMembers.entrySet().iterator();
+ long now = System.currentTimeMillis();
+ while (it.hasNext()) {
+ Map.Entry entry = (Map.Entry)it.next();
+ long access = ((Long)entry.getValue()).longValue();
+ if ((now - access) > timeout) {
+ it.remove();
+ memberDisappeared((Member)entry.getKey());
+ }
+ }
+ }//synch
+ }
+
+ /**
+ * We have received a member alive notification
+ * @param member Member
+ */
+ protected void memberAlive(Member member) {
+ synchronized (mapMembers) {
+ if (!mapMembers.containsKey(member)) {
+ mapMemberAdded(member);
+ } //end if
+ mapMembers.put(member, new Long(System.currentTimeMillis()));
+ }
+ }
+
+ /**
+ * Helper method to broadcast a message to all members in a channel
+ * @param msgtype int
+ * @param rpc boolean
+ * @throws ChannelException
+ */
+ protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
+ //send out a map membership message, only wait for the first reply
+ MapMessage msg =
+ new MapMessage(this.mapContextName, msgtype, false, null, null, null, channel.getLocalMember(false), null);
+ if (rpc) {
+ Response[] resp =
+ rpcChannel.send(channel.getMembers(), msg, RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout);
+ for (int i = 0; i < resp.length; i++) {
+ mapMemberAdded(resp[i].getSource());
+ messageReceived(resp[i].getMessage(), resp[i].getSource());
+ }
+ } else {
+ channel.send(channel.getMembers(), msg, channelSendOptions);
+ }
+ }
+
+ public void breakdown() {
+ finalize();
+ }
+
+ public void finalize() {
+ try {
+ broadcast(MapMessage.MSG_STOP, false);
+ } catch (Exception ignore) {
+ }
+ //cleanup
+ if (this.rpcChannel != null) {
+ this.rpcChannel.breakdown();
+ }
+ if (this.channel != null) {
+ this.channel.removeChannelListener(this);
+ this.channel.removeMembershipListener(this);
+ }
+ this.rpcChannel = null;
+ this.channel = null;
+ this.mapMembers.clear();
+ super.clear();
+ this.stateTransferred = false;
+ this.externalLoaders = null;
+ }
+
+ public int hashCode() {
+ return Arrays.hashCode(this.mapContextName);
+ }
+
+ public boolean equals(Object o) {
+ if (o == null)
+ return false;
+ if (!(o instanceof AbstractReplicatedMap))
+ return false;
+ if (!(o.getClass().equals(this.getClass())))
+ return false;
+ AbstractReplicatedMap other = (AbstractReplicatedMap)o;
+ return Arrays.equals(mapContextName, other.mapContextName);
+ }
+
+ //------------------------------------------------------------------------------
+ // GROUP COM INTERFACES
+ //------------------------------------------------------------------------------
+ public Member[] getMapMembers(Map<Member, Long> members) {
+ synchronized (members) {
+ Member[] result = new Member[members.size()];
+ members.keySet().toArray(result);
+ return result;
+ }
+ }
+
+ public Member[] getMapMembers() {
+ return getMapMembers(this.mapMembers);
+ }
+
+ public Member[] getMapMembersExcl(Member[] exclude) {
+ synchronized (mapMembers) {
+ Map<Member, Long> list = (Map<Member, Long>)((HashMap<Member, Long>)mapMembers).clone();
+ for (int i = 0; i < exclude.length; i++)
+ list.remove(exclude[i]);
+ return getMapMembers(list);
+ }
+ }
+
+ /**
+ * Replicates any changes to the object since the last time
+ * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>
+ * @param complete - if set to true, the object is replicated to its backup
+ * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will
+ * be replicated
+ */
+ public void replicate(Object key, boolean complete) {
+ if (log.isTraceEnabled())
+ log.trace("Replicate invoked on key:" + key);
+ MapEntry entry = (MapEntry)super.get(key);
+ if (entry == null)
+ return;
+ if (!entry.isSerializable())
+ return;
+ if (entry != null && entry.isPrimary() && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) {
+ Object value = entry.getValue();
+ //check to see if we need to replicate this object isDirty()||complete
+ boolean repl = complete || ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDirty());
+
+ if (!repl) {
+ if (log.isTraceEnabled())
+ log.trace("Not replicating:" + key + ", no change made");
+
+ return;
+ }
+ //check to see if the message is diffable
+ boolean diff = ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable());
+ MapMessage msg = null;
+ if (diff) {
+ ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue();
+ try {
+ rentry.lock();
+ //construct a diff message
+ msg =
+ new MapMessage(mapContextName, MapMessage.MSG_BACKUP, true, (Serializable)entry.getKey(), null,
+ rentry.getDiff(), entry.getPrimary(), entry.getBackupNodes());
+ } catch (IOException x) {
+ log.error("Unable to diff object. Will replicate the entire object instead.", x);
+ } finally {
+ rentry.unlock();
+ }
+
+ }
+ if (msg == null) {
+ //construct a complete
+ msg =
+ new MapMessage(mapContextName, MapMessage.MSG_BACKUP, false, (Serializable)entry.getKey(),
+ (Serializable)entry.getValue(), null, entry.getPrimary(), entry.getBackupNodes());
+
+ }
+ try {
+ if (channel != null && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) {
+ channel.send(entry.getBackupNodes(), msg, channelSendOptions);
+ }
+ } catch (ChannelException x) {
+ log.error("Unable to replicate data.", x);
+ }
+ } //end if
+
+ }
+
+ /**
+ * This can be invoked by a periodic thread to replicate out any changes.
+ * For maps that don't store objects that implement ReplicatedMapEntry, this
+ * method should be used infrequently to avoid large amounts of data transfer
+ * @param complete boolean
+ */
+ public void replicate(boolean complete) {
+ Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<Object, Object> e = i.next();
+ replicate(e.getKey(), complete);
+ } //while
+
+ }
+
+ public void transferState() {
+ try {
+ Member[] members = getMapMembers();
+ Member backup = members.length > 0 ? (Member)members[0] : null;
+ if (backup != null) {
+ MapMessage msg =
+ new MapMessage(mapContextName, getStateMessageType(), false, null, null, null, null, null);
+ Response[] resp =
+ rpcChannel.send(new Member[] {backup}, msg, RpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
+ if (resp.length > 0) {
+ synchronized (stateMutex) {
+ msg = (MapMessage)resp[0].getMessage();
+ msg.deserialize(getExternalLoaders());
+ List list = (List)msg.getValue();
+ for (int i = 0; i < list.size(); i++) {
+ messageReceived((Serializable)list.get(i), resp[0].getSource());
+ } //for
+ }
+ } else {
+ log.warn("Transfer state, 0 replies, probably a timeout.");
+ }
+ }
+ } catch (ChannelException x) {
+ log.error("Unable to transfer LazyReplicatedMap state.", x);
+ } catch (IOException x) {
+ log.error("Unable to transfer LazyReplicatedMap state.", x);
+ } catch (ClassNotFoundException x) {
+ log.error("Unable to transfer LazyReplicatedMap state.", x);
+ }
+ stateTransferred = true;
+ }
+
+ /**
+ * @todo implement state transfer
+ * @param msg Serializable
+ * @return Serializable - null if no reply should be sent
+ */
+ public Serializable replyRequest(Serializable msg, final Member sender) {
+ if (!(msg instanceof MapMessage))
+ return null;
+ MapMessage mapmsg = (MapMessage)msg;
+
+ //map init request
+ if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
+ mapmsg.setPrimary(channel.getLocalMember(false));
+ return mapmsg;
+ }
+
+ //map start request
+ if (mapmsg.getMsgType() == MapMessage.MSG_START) {
+ mapmsg.setPrimary(channel.getLocalMember(false));
+ mapMemberAdded(sender);
+ return mapmsg;
+ }
+
+ //backup request
+ if (mapmsg.getMsgType() == MapMessage.MSG_RETRIEVE_BACKUP) {
+ MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+ if (entry == null || (!entry.isSerializable()))
+ return null;
+ mapmsg.setValue((Serializable)entry.getValue());
+ return mapmsg;
+ }
+
+ //state transfer request
+ if (mapmsg.getMsgType() == MapMessage.MSG_STATE || mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY) {
+ synchronized (stateMutex) { //make sure we dont do two things at the same time
+ ArrayList<MapMessage> list = new ArrayList<MapMessage>();
+ Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<Object, Object> e = i.next();
+ MapEntry entry = (MapEntry)super.get(e.getKey());
+ if (entry != null && entry.isSerializable()) {
+ boolean copy = (mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY);
+ MapMessage me =
+ new MapMessage(mapContextName, copy ? MapMessage.MSG_COPY : MapMessage.MSG_PROXY, false,
+ (Serializable)entry.getKey(), copy ? (Serializable)entry.getValue() : null,
+ null, entry.getPrimary(), entry.getBackupNodes());
+ list.add(me);
+ }
+ }
+ mapmsg.setValue(list);
+ return mapmsg;
+
+ } //synchronized
+ }
+
+ return null;
+
+ }
+
+ /**
+ * If the reply has already been sent to the requesting thread,
+ * the rpc callback can handle any data that comes in after the fact.
+ * @param msg Serializable
+ * @param sender Member
+ */
+ public void leftOver(Serializable msg, Member sender) {
+ //left over membership messages
+ if (!(msg instanceof MapMessage))
+ return;
+
+ MapMessage mapmsg = (MapMessage)msg;
+ try {
+ mapmsg.deserialize(getExternalLoaders());
+ if (mapmsg.getMsgType() == MapMessage.MSG_START) {
+ mapMemberAdded(mapmsg.getPrimary());
+ } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
+ memberAlive(mapmsg.getPrimary());
+ }
+ } catch (IOException x) {
+ log.error("Unable to deserialize MapMessage.", x);
+ } catch (ClassNotFoundException x) {
+ log.error("Unable to deserialize MapMessage.", x);
+ }
+ }
+
+ public void messageReceived(Serializable msg, Member sender) {
+ if (!(msg instanceof MapMessage))
+ return;
+
+ MapMessage mapmsg = (MapMessage)msg;
+ if (log.isTraceEnabled()) {
+ log.trace("Map[" + mapname + "] received message:" + mapmsg);
+ }
+
+ try {
+ mapmsg.deserialize(getExternalLoaders());
+ } catch (IOException x) {
+ log.error("Unable to deserialize MapMessage.", x);
+ return;
+ } catch (ClassNotFoundException x) {
+ log.error("Unable to deserialize MapMessage.", x);
+ return;
+ }
+ if (log.isTraceEnabled())
+ log.trace("Map message received from:" + sender.getName() + " msg:" + mapmsg);
+ if (mapmsg.getMsgType() == MapMessage.MSG_START) {
+ mapMemberAdded(mapmsg.getPrimary());
+ }
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
+ memberDisappeared(mapmsg.getPrimary());
+ }
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
+ MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+ if (entry == null) {
+ entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
+ entry.setBackup(false);
+ entry.setProxy(true);
+ entry.setBackupNodes(mapmsg.getBackupNodes());
+ entry.setPrimary(mapmsg.getPrimary());
+ super.put(entry.getKey(), entry);
+ } else {
+ entry.setProxy(true);
+ entry.setBackup(false);
+ entry.setBackupNodes(mapmsg.getBackupNodes());
+ entry.setPrimary(mapmsg.getPrimary());
+ }
+ }
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
+ super.remove(mapmsg.getKey());
+ }
+
+ if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP || mapmsg.getMsgType() == MapMessage.MSG_COPY) {
+ MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
+ if (entry == null) {
+ entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
+ entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
+ entry.setProxy(false);
+ entry.setBackupNodes(mapmsg.getBackupNodes());
+ entry.setPrimary(mapmsg.getPrimary());
+ if (mapmsg.getValue() != null && mapmsg.getValue() instanceof ReplicatedMapEntry) {
+ ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
+ }
+ } else {
+ entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
+ entry.setProxy(false);
+ entry.setBackupNodes(mapmsg.getBackupNodes());
+ entry.setPrimary(mapmsg.getPrimary());
+ if (entry.getValue() instanceof ReplicatedMapEntry) {
+ ReplicatedMapEntry diff = (ReplicatedMapEntry)entry.getValue();
+ if (mapmsg.isDiff()) {
+ try {
+ diff.lock();
+ diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
+ } catch (Exception x) {
+ log.error("Unable to apply diff to key:" + entry.getKey(), x);
+ } finally {
+ diff.unlock();
+ }
+ } else {
+ if (mapmsg.getValue() != null)
+ entry.setValue(mapmsg.getValue());
+ ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
+ } //end if
+ } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
+ ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
+ re.setOwner(getMapOwner());
+ entry.setValue(re);
+ } else {
+ if (mapmsg.getValue() != null)
+ entry.setValue(mapmsg.getValue());
+ } //end if
+ } //end if
+ super.put(entry.getKey(), entry);
+ } //end if
+ }
+
+ public boolean accept(Serializable msg, Member sender) {
+ boolean result = false;
+ if (msg instanceof MapMessage) {
+ if (log.isTraceEnabled())
+ log.trace("Map[" + mapname + "] accepting...." + msg);
+ result = Arrays.equals(mapContextName, ((MapMessage)msg).getMapId());
+ if (log.isTraceEnabled())
+ log.trace("Msg[" + mapname + "] accepted[" + result + "]...." + msg);
+ }
+ return result;
+ }
+
+ public void mapMemberAdded(Member member) {
+ Member self = getChannel().getLocalMember(false);
+ if (member.equals(self))
+ return;
+ boolean memberAdded = false;
+ //select a backup node if we don't have one
+ synchronized (mapMembers) {
+ if (!mapMembers.containsKey(member)) {
+ mapMembers.put(member, new Long(System.currentTimeMillis()));
+ memberAdded = true;
+ }
+ }
+ if (memberAdded) {
+ synchronized (stateMutex) {
+ Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<Object, Object> e = i.next();
+ MapEntry entry = (MapEntry)super.get(e.getKey());
+ if (entry == null)
+ continue;
+ // if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
+ // [rfeng] Change the behavior to replicate to all nodes
+ if (entry.isPrimary() && self.equals(entry.getPrimary())) {
+ try {
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+ entry.setBackupNodes(backup);
+ entry.setPrimary(self);
+ } catch (ChannelException x) {
+ log.error("Unable to select backup node.", x);
+ } //catch
+ } //end if
+ } //while
+ } //synchronized
+ }//end if
+ }
+
+ public boolean inSet(Member m, Member[] set) {
+ if (set == null)
+ return false;
+ boolean result = false;
+ for (int i = 0; i < set.length && (!result); i++)
+ if (m.equals(set[i]))
+ result = true;
+ return result;
+ }
+
+ public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
+ List<Member> result = new ArrayList<Member>();
+ for (int i = 0; i < set.length; i++) {
+ boolean include = true;
+ for (int j = 0; j < mbrs.length; j++)
+ if (mbrs[j].equals(set[i]))
+ include = false;
+ if (include)
+ result.add(set[i]);
+ }
+ return (Member[])result.toArray(new Member[result.size()]);
+ }
+
+ public void memberAdded(Member member) {
+ //do nothing
+ }
+
+ public void memberDisappeared(Member member) {
+ boolean removed = false;
+ synchronized (mapMembers) {
+ removed = (mapMembers.remove(member) != null);
+ if (!removed) {
+ if (log.isDebugEnabled())
+ log.debug("Member[" + member + "] disappeared, but was not present in the map.");
+ return; //the member was not part of our map.
+ }
+ }
+
+ Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<Object, Object> e = i.next();
+ MapEntry entry = (MapEntry)super.get(e.getKey());
+ if (entry == null)
+ continue;
+ if (entry.isPrimary() && inSet(member, entry.getBackupNodes())) {
+ if (log.isDebugEnabled())
+ log.debug("[1] Primary choosing a new backup");
+ try {
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+ entry.setBackupNodes(backup);
+ entry.setPrimary(channel.getLocalMember(false));
+ } catch (ChannelException x) {
+ log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
+ }
+ } else if (member.equals(entry.getPrimary())) {
+ if (log.isDebugEnabled())
+ log.debug("[2] Primary disappeared");
+ entry.setPrimary(null);
+ } //end if
+
+ if (entry.isProxy() && entry.getPrimary() == null
+ && entry.getBackupNodes() != null
+ && entry.getBackupNodes().length == 1
+ && entry.getBackupNodes()[0].equals(member)) {
+ //remove proxies that have no backup nor primaries
+ if (log.isDebugEnabled())
+ log.debug("[3] Removing orphaned proxy");
+ i.remove();
+ } else if (entry.getPrimary() == null && entry.isBackup()
+ && entry.getBackupNodes() != null
+ && entry.getBackupNodes().length == 1
+ && entry.getBackupNodes()[0].equals(channel.getLocalMember(false))) {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("[4] Backup becoming primary");
+ entry.setPrimary(channel.getLocalMember(false));
+ entry.setBackup(false);
+ entry.setProxy(false);
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
+ entry.setBackupNodes(backup);
+ if (mapOwner != null)
+ mapOwner.objectMadePrimay(entry.getKey(), entry.getValue());
+
+ } catch (ChannelException x) {
+ log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
+ }
+ }
+
+ } //while
+ }
+
+ public int getNextBackupIndex() {
+ int size = mapMembers.size();
+ if (mapMembers.size() == 0)
+ return -1;
+ int node = currentNode++;
+ if (node >= size) {
+ node = 0;
+ currentNode = 0;
+ }
+ return node;
+ }
+
+ public Member getNextBackupNode() {
+ Member[] members = getMapMembers();
+ int node = getNextBackupIndex();
+ if (members.length == 0 || node == -1)
+ return null;
+ if (node >= members.length)
+ node = 0;
+ return members[node];
+ }
+
+ protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
+
+ public void heartbeat() {
+ try {
+ ping(accessTimeout);
+ } catch (Exception x) {
+ log.error("Unable to send AbstractReplicatedMap.ping message", x);
+ }
+ }
+
+ //------------------------------------------------------------------------------
+ // METHODS TO OVERRIDE
+ //------------------------------------------------------------------------------
+
+ /**
+ * Removes an object from this map, it will also remove it from
+ *
+ * @param key Object
+ * @return Object
+ */
+ public Object remove(Object key) {
+ return remove(key, true);
+ }
+
+ public Object remove(Object key, boolean notify) {
+ MapEntry entry = (MapEntry)super.remove(key);
+
+ try {
+ if (getMapMembers().length > 0 && notify) {
+ MapMessage msg =
+ new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable)key, null, null,
+ null, null);
+ getChannel().send(getMapMembers(), msg, getChannelSendOptions());
+ }
+ } catch (ChannelException x) {
+ log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation", x);
+ }
+ return entry != null ? entry.getValue() : null;
+ }
+
+ public MapEntry getInternal(Object key) {
+ return (MapEntry)super.get(key);
+ }
+
+ public Object get(Object key) {
+ MapEntry entry = (MapEntry)super.get(key);
+ if (log.isTraceEnabled())
+ log.trace("Requesting id:" + key + " entry:" + entry);
+ if (entry == null)
+ return null;
+ if (!entry.isPrimary()) {
+ //if the message is not primary, we need to retrieve the latest value
+ try {
+ Member[] backup = null;
+ MapMessage msg = null;
+ if (!entry.isBackup()) {
+ //make sure we don't retrieve from ourselves
+ msg =
+ new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable)key,
+ null, null, null, null);
+ Response[] resp =
+ getRpcChannel().send(entry.getBackupNodes(),
+ msg,
+ RpcChannel.FIRST_REPLY,
+ Channel.SEND_OPTIONS_DEFAULT,
+ getRpcTimeout());
+ if (resp == null || resp.length == 0) {
+ //no responses
+ log.warn("Unable to retrieve remote object for key:" + key);
+ return null;
+ }
+ msg = (MapMessage)resp[0].getMessage();
+ msg.deserialize(getExternalLoaders());
+ backup = entry.getBackupNodes();
+ if (entry.getValue() instanceof ReplicatedMapEntry) {
+ ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
+ val.setOwner(getMapOwner());
+ }
+ if (msg.getValue() != null)
+ entry.setValue(msg.getValue());
+ }
+ if (entry.isBackup()) {
+ //select a new backup node
+ backup = publishEntryInfo(key, entry.getValue());
+ } else if (entry.isProxy()) {
+ //invalidate the previous primary
+ msg =
+ new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable)key, null, null,
+ channel.getLocalMember(false), backup);
+ Member[] dest = getMapMembersExcl(backup);
+ if (dest != null && dest.length > 0) {
+ getChannel().send(dest, msg, getChannelSendOptions());
+ }
+ }
+ entry.setPrimary(channel.getLocalMember(false));
+ entry.setBackupNodes(backup);
+ entry.setBackup(false);
+ entry.setProxy(false);
+
+ } catch (Exception x) {
+ log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x);
+ return null;
+ }
+ }
+ if (log.isTraceEnabled())
+ log.trace("Requesting id:" + key + " result:" + entry.getValue());
+ if (entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry) {
+ ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
+ //hack, somehow this is not being set above
+ val.setOwner(getMapOwner());
+
+ }
+ return entry.getValue();
+ }
+
+ protected void printMap(String header) {
+ try {
+ System.out.println("\nDEBUG MAP:" + header);
+ System.out.println("Map[" + new String(mapContextName, chset) + ", Map Size:" + super.size());
+ Member[] mbrs = getMapMembers();
+ for (int i = 0; i < mbrs.length; i++) {
+ System.out.println("Mbr[" + (i + 1) + "=" + mbrs[i].getName());
+ }
+ Iterator i = super.entrySet().iterator();
+ int cnt = 0;
+
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry)i.next();
+ System.out.println((++cnt) + ". " + super.get(e.getKey()));
+ }
+ System.out.println("EndMap]\n\n");
+ } catch (Exception ignore) {
+ ignore.printStackTrace();
+ }
+ }
+
+ /**
+ * Returns true if the key has an entry in the map.
+ * The entry can be a proxy or a backup entry, invoking <code>get(key)</code>
+ * will make this entry primary for the group
+ * @param key Object
+ * @return boolean
+ */
+ public boolean containsKey(Object key) {
+ return super.containsKey(key);
+ }
+
+ public Object put(Object key, Object value) {
+ return put(key, value, true);
+ }
+
+ public Object put(Object key, Object value, boolean notify) {
+ MapEntry entry = new MapEntry(key, value);
+ entry.setBackup(false);
+ entry.setProxy(false);
+ entry.setPrimary(channel.getLocalMember(false));
+
+ Object old = null;
+
+ //make sure that any old values get removed
+ if (containsKey(key))
+ old = remove(key);
+ try {
+ if (notify) {
+ Member[] backup = publishEntryInfo(key, value);
+ entry.setBackupNodes(backup);
+ }
+ } catch (ChannelException x) {
+ log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
+ }
+ super.put(key, entry);
+ return old;
+ }
+
+ /**
+ * Copies all values from one map to this instance
+ * @param m Map
+ */
+ public void putAll(Map m) {
+ Iterator i = m.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry entry = (Map.Entry)i.next();
+ put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public void clear() {
+ clear(true);
+ }
+
+ public void clear(boolean notify) {
+ if (notify) {
+ //only delete active keys
+ Iterator<Object> keys = keySet().iterator();
+ while (keys.hasNext())
+ remove(keys.next());
+ } else {
+ super.clear();
+ }
+ }
+
+ public boolean containsValue(Object value) {
+ if (value == null) {
+ return super.containsValue(value);
+ } else {
+ Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<Object, Object> e = i.next();
+ MapEntry entry = (MapEntry)super.get(e.getKey());
+ if (entry != null && entry.isPrimary() && value.equals(entry.getValue()))
+ return true;
+ }//while
+ return false;
+ }//end if
+ }
+
+ public Object clone() {
+ throw new UnsupportedOperationException("This operation is not valid on a replicated map");
+ }
+
+ /**
+ * Returns the entire contents of the map
+ * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information
+ * about the object.
+ * @return Set
+ */
+ public Set<Map.Entry<Object, Object>> entrySetFull() {
+ return super.entrySet();
+ }
+
+ public Set<Object> keySetFull() {
+ return super.keySet();
+ }
+
+ public int sizeFull() {
+ return super.size();
+ }
+
+ public Set<Map.Entry<Object, Object>> entrySet() {
+ Set<Map.Entry<Object, Object>> set = new LinkedHashSet<Map.Entry<Object, Object>>(super.size());
+ Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<Object, Object> e = i.next();
+ Object key = e.getKey();
+ MapEntry entry = (MapEntry)super.get(key);
+ if (entry != null && entry.isPrimary()) {
+ set.add(new MapEntry(key, entry.getValue()));
+ }
+ }
+ return Collections.unmodifiableSet(set);
+ }
+
+ public Set<Object> keySet() {
+ //todo implement
+ //should only return keys where this is active.
+ LinkedHashSet<Object> set = new LinkedHashSet<Object>(super.size());
+ Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<Object, Object> e = i.next();
+ Object key = e.getKey();
+ MapEntry entry = (MapEntry)super.get(key);
+ if (entry != null && entry.isPrimary())
+ set.add(key);
+ }
+ return Collections.unmodifiableSet(set);
+
+ }
+
+ public int size() {
+ //todo, implement a counter variable instead
+ //only count active members in this node
+ int counter = 0;
+ Iterator<Map.Entry<Object, Object>> it = super.entrySet().iterator();
+ while (it != null && it.hasNext()) {
+ Map.Entry<Object, Object> e = it.next();
+ if (e != null) {
+ MapEntry entry = (MapEntry)super.get(e.getKey());
+ if (entry != null && entry.isPrimary() && entry.getValue() != null)
+ counter++;
+ }
+ }
+ return counter;
+ }
+
+ protected boolean removeEldestEntry(Map.Entry<Object, Object> eldest) {
+ return false;
+ }
+
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ public Collection<Object> values() {
+ ArrayList<Object> values = new ArrayList<Object>();
+ Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<Object, Object> e = i.next();
+ MapEntry entry = (MapEntry)super.get(e.getKey());
+ if (entry != null && entry.isPrimary() && entry.getValue() != null)
+ values.add(entry.getValue());
+ }
+ return Collections.unmodifiableCollection(values);
+ }
+
+ //------------------------------------------------------------------------------
+ // Map Entry class
+ //------------------------------------------------------------------------------
+ public static class MapEntry implements Map.Entry<Object, Object> {
+ private boolean backup;
+ private boolean proxy;
+ private Member[] backupNodes;
+ private Member primary;
+ private Object key;
+ private Object value;
+
+ public MapEntry(Object key, Object value) {
+ setKey(key);
+ setValue(value);
+
+ }
+
+ public boolean isKeySerializable() {
+ return (key == null) || (key instanceof Serializable);
+ }
+
+ public boolean isValueSerializable() {
+ return (value == null) || (value instanceof Serializable);
+ }
+
+ public boolean isSerializable() {
+ return isKeySerializable() && isValueSerializable();
+ }
+
+ public boolean isBackup() {
+ return backup;
+ }
+
+ public void setBackup(boolean backup) {
+ this.backup = backup;
+ }
+
+ public boolean isProxy() {
+ return proxy;
+ }
+
+ public boolean isPrimary() {
+ return ((!proxy) && (!backup));
+ }
+
+ public void setProxy(boolean proxy) {
+ this.proxy = proxy;
+ }
+
+ public boolean isDiffable() {
+ return (value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable();
+ }
+
+ public void setBackupNodes(Member[] nodes) {
+ this.backupNodes = nodes;
+ }
+
+ public Member[] getBackupNodes() {
+ return backupNodes;
+ }
+
+ public void setPrimary(Member m) {
+ primary = m;
+ }
+
+ public Member getPrimary() {
+ return primary;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public Object setValue(Object value) {
+ Object old = this.value;
+ this.value = value;
+ return old;
+ }
+
+ public Object getKey() {
+ return key;
+ }
+
+ public Object setKey(Object key) {
+ Object old = this.key;
+ this.key = key;
+ return old;
+ }
+
+ public int hashCode() {
+ return key.hashCode();
+ }
+
+ public boolean equals(Object o) {
+ return key.equals(o);
+ }
+
+ /**
+ * apply a diff, or an entire object
+ * @param data byte[]
+ * @param offset int
+ * @param length int
+ * @param diff boolean
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public void apply(byte[] data, int offset, int length, boolean diff) throws IOException, ClassNotFoundException {
+ if (isDiffable() && diff) {
+ ReplicatedMapEntry rentry = (ReplicatedMapEntry)value;
+ try {
+ rentry.lock();
+ rentry.applyDiff(data, offset, length);
+ } finally {
+ rentry.unlock();
+ }
+ } else if (length == 0) {
+ value = null;
+ proxy = true;
+ } else {
+ value = XByteBuffer.deserialize(data, offset, length);
+ }
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("MapEntry[key:");
+ buf.append(getKey()).append("; ");
+ buf.append("value:").append(getValue()).append("; ");
+ buf.append("primary:").append(isPrimary()).append("; ");
+ buf.append("backup:").append(isBackup()).append("; ");
+ buf.append("proxy:").append(isProxy()).append(";]");
+ return buf.toString();
+ }
+
+ }
+
+ //------------------------------------------------------------------------------
+ // map message to send to and from other maps
+ //------------------------------------------------------------------------------
+
+ public static class MapMessage implements Serializable {
+ private static final long serialVersionUID = -7847288807489375686L;
+ public static final int MSG_BACKUP = 1;
+ public static final int MSG_RETRIEVE_BACKUP = 2;
+ public static final int MSG_PROXY = 3;
+ public static final int MSG_REMOVE = 4;
+ public static final int MSG_STATE = 5;
+ public static final int MSG_START = 6;
+ public static final int MSG_STOP = 7;
+ public static final int MSG_INIT = 8;
+ public static final int MSG_COPY = 9;
+ public static final int MSG_STATE_COPY = 10;
+
+ private byte[] mapId;
+ private int msgtype;
+ private boolean diff;
+ private transient Serializable key;
+ private transient Serializable value;
+ private byte[] valuedata;
+ private byte[] keydata;
+ private byte[] diffvalue;
+ private Member[] nodes;
+ private Member primary;
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer("MapMessage[context=");
+ buf.append(new String(mapId));
+ buf.append("; type=");
+ buf.append(getTypeDesc());
+ buf.append("; key=");
+ buf.append(key);
+ buf.append("; value=");
+ buf.append(value);
+ return buf.toString();
+ }
+
+ public String getTypeDesc() {
+ switch (msgtype) {
+ case MSG_BACKUP:
+ return "MSG_BACKUP";
+ case MSG_RETRIEVE_BACKUP:
+ return "MSG_RETRIEVE_BACKUP";
+ case MSG_PROXY:
+ return "MSG_PROXY";
+ case MSG_REMOVE:
+ return "MSG_REMOVE";
+ case MSG_STATE:
+ return "MSG_STATE";
+ case MSG_START:
+ return "MSG_START";
+ case MSG_STOP:
+ return "MSG_STOP";
+ case MSG_INIT:
+ return "MSG_INIT";
+ case MSG_STATE_COPY:
+ return "MSG_STATE_COPY";
+ case MSG_COPY:
+ return "MSG_COPY";
+ default:
+ return "UNKNOWN";
+ }
+ }
+
+ public MapMessage() {
+ }
+
+ public MapMessage(byte[] mapId,
+ int msgtype,
+ boolean diff,
+ Serializable key,
+ Serializable value,
+ byte[] diffvalue,
+ Member primary,
+ Member[] nodes) {
+ this.mapId = mapId;
+ this.msgtype = msgtype;
+ this.diff = diff;
+ this.key = key;
+ this.value = value;
+ this.diffvalue = diffvalue;
+ this.nodes = nodes;
+ this.primary = primary;
+ setValue(value);
+ setKey(key);
+ }
+
+ public void deserialize(ClassLoader[] cls) throws IOException, ClassNotFoundException {
+ key(cls);
+ value(cls);
+ }
+
+ public int getMsgType() {
+ return msgtype;
+ }
+
+ public boolean isDiff() {
+ return diff;
+ }
+
+ public Serializable getKey() {
+ try {
+ return key(null);
+ } catch (Exception x) {
+ log.error("Deserialization error of the MapMessage.key", x);
+ return null;
+ }
+ }
+
+ public Serializable key(ClassLoader[] cls) throws IOException, ClassNotFoundException {
+ if (key != null)
+ return key;
+ if (keydata == null || keydata.length == 0)
+ return null;
+ key = XByteBuffer.deserialize(keydata, 0, keydata.length, cls);
+ keydata = null;
+ return key;
+ }
+
+ public byte[] getKeyData() {
+ return keydata;
+ }
+
+ public Serializable getValue() {
+ try {
+ return value(null);
+ } catch (Exception x) {
+ log.error("Deserialization error of the MapMessage.value", x);
+ return null;
+ }
+ }
+
+ public Serializable value(ClassLoader[] cls) throws IOException, ClassNotFoundException {
+ if (value != null)
+ return value;
+ if (valuedata == null || valuedata.length == 0)
+ return null;
+ value = XByteBuffer.deserialize(valuedata, 0, valuedata.length, cls);
+ valuedata = null;
+ ;
+ return value;
+ }
+
+ public byte[] getValueData() {
+ return valuedata;
+ }
+
+ public byte[] getDiffValue() {
+ return diffvalue;
+ }
+
+ public Member[] getBackupNodes() {
+ return nodes;
+ }
+
+ private void setBackUpNodes(Member[] nodes) {
+ this.nodes = nodes;
+ }
+
+ public Member getPrimary() {
+ return primary;
+ }
+
+ private void setPrimary(Member m) {
+ primary = m;
+ }
+
+ public byte[] getMapId() {
+ return mapId;
+ }
+
+ public void setValue(Serializable value) {
+ try {
+ if (value != null)
+ valuedata = XByteBuffer.serialize(value);
+ this.value = value;
+ } catch (IOException x) {
+ throw new RuntimeException(x);
+ }
+ }
+
+ public void setKey(Serializable key) {
+ try {
+ if (key != null)
+ keydata = XByteBuffer.serialize(key);
+ this.key = key;
+ } catch (IOException x) {
+ throw new RuntimeException(x);
+ }
+ }
+
+ protected Member[] readMembers(ObjectInput in) throws IOException, ClassNotFoundException {
+ int nodecount = in.readInt();
+ Member[] members = new Member[nodecount];
+ for (int i = 0; i < members.length; i++) {
+ byte[] d = new byte[in.readInt()];
+ in.read(d);
+ if (d.length > 0)
+ members[i] = MemberImpl.getMember(d);
+ }
+ return members;
+ }
+
+ protected void writeMembers(ObjectOutput out, Member[] members) throws IOException {
+ if (members == null)
+ members = new Member[0];
+ out.writeInt(members.length);
+ for (int i = 0; i < members.length; i++) {
+ if (members[i] != null) {
+ byte[] d = members[i] != null ? ((MemberImpl)members[i]).getData(false) : new byte[0];
+ out.writeInt(d.length);
+ out.write(d);
+ }
+ }
+ }
+
+ /**
+ * shallow clone
+ * @return Object
+ */
+ public Object clone() {
+ MapMessage msg =
+ new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.primary,
+ this.nodes);
+ msg.keydata = this.keydata;
+ msg.valuedata = this.valuedata;
+ return msg;
+ }
+ } //MapMessage
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+ public byte[] getMapContextName() {
+ return mapContextName;
+ }
+
+ public RpcChannel getRpcChannel() {
+ return rpcChannel;
+ }
+
+ public long getRpcTimeout() {
+ return rpcTimeout;
+ }
+
+ public Object getStateMutex() {
+ return stateMutex;
+ }
+
+ public boolean isStateTransferred() {
+ return stateTransferred;
+ }
+
+ public MapOwner getMapOwner() {
+ return mapOwner;
+ }
+
+ public ClassLoader[] getExternalLoaders() {
+ return externalLoaders;
+ }
+
+ public int getChannelSendOptions() {
+ return channelSendOptions;
+ }
+
+ public long getAccessTimeout() {
+ return accessTimeout;
+ }
+
+ public void setMapOwner(MapOwner mapOwner) {
+ this.mapOwner = mapOwner;
+ }
+
+ public void setExternalLoaders(ClassLoader[] externalLoaders) {
+ this.externalLoaders = externalLoaders;
+ }
+
+ public void setChannelSendOptions(int channelSendOptions) {
+ this.channelSendOptions = channelSendOptions;
+ }
+
+ public void setAccessTimeout(long accessTimeout) {
+ this.accessTimeout = accessTimeout;
+ }
+
+}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java
new file mode 100644
index 0000000000..1e4d3af50d
--- /dev/null
+++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.tribes;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+
+/**
+ * A static interceptor to disables multicast.
+ * Can be removed when/if the function gets added to Tribes.
+ * See Tomcat email http://markmail.org/message/doqu7pfl2hvvdfcl
+ */
+public class DisableMcastInterceptor extends ChannelInterceptorBase {
+
+ public DisableMcastInterceptor() {
+ super();
+ }
+
+ public void start(int svc) throws ChannelException {
+ svc = (svc & (~Channel.MBR_TX_SEQ));
+ super.start(svc);
+ }
+}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java
new file mode 100644
index 0000000000..9e540743bf
--- /dev/null
+++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.tribes;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * The Map that can fire events on put/remove of entries
+ */
+public abstract class MapStore extends ConcurrentHashMap<Object, Object> {
+ private static final long serialVersionUID = -2127235547082144368L;
+ private List<MapListener> listeners = new CopyOnWriteArrayList<MapListener>();
+
+ protected MapStore(int initialCapacity, float loadFactor, int concurrencyLevel) {
+ super(initialCapacity, loadFactor, concurrencyLevel);
+ }
+
+ @Override
+ public Object put(Object key, Object value) {
+ Object old = super.put(key, value);
+ if (old != null) {
+ for (MapListener listener : listeners) {
+ listener.entryUpdated(key, old, value);
+ }
+ } else {
+ for (MapListener listener : listeners) {
+ listener.entryAdded(key, value);
+ }
+
+ }
+ return old;
+ }
+
+ @Override
+ public Object remove(Object key) {
+ Object old = super.remove(key);
+ if (old != null) {
+ for (MapListener listener : listeners) {
+ listener.entryRemoved(key, old);
+ }
+ }
+ return old;
+ }
+
+ public void addListener(MapListener listener) {
+ listeners.add(listener);
+ }
+
+ public List<MapListener> getListeners() {
+ return listeners;
+ }
+
+ public boolean removeListener(MapListener listener) {
+ return listeners.remove(listener);
+ }
+
+ public static interface MapListener {
+ void entryAdded(Object key, Object value);
+
+ void entryUpdated(Object key, Object oldValue, Object newValue);
+
+ void entryRemoved(Object key, Object value);
+ }
+}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
new file mode 100644
index 0000000000..5dc627d88d
--- /dev/null
+++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.tribes;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelReceiver;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.GroupChannel;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
+import org.apache.catalina.tribes.membership.McastService;
+import org.apache.catalina.tribes.membership.StaticMember;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.LifeCycleListener;
+import org.apache.tuscany.sca.endpoint.tribes.AbstractReplicatedMap.MapEntry;
+import org.apache.tuscany.sca.endpoint.tribes.MapStore.MapListener;
+import org.apache.tuscany.sca.runtime.BaseEndpointRegistry;
+import org.apache.tuscany.sca.runtime.DomainRegistryURI;
+import org.apache.tuscany.sca.runtime.EndpointRegistry;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+
+/**
+ * A replicated EndpointRegistry based on Apache Tomcat Tribes
+ */
+public class ReplicatedEndpointRegistry extends BaseEndpointRegistry implements EndpointRegistry, LifeCycleListener,
+ MapListener {
+ private final static Logger logger = Logger.getLogger(ReplicatedEndpointRegistry.class.getName());
+ private static final String MULTICAST_ADDRESS = "228.0.0.100";
+ private static final int MULTICAST_PORT = 50000;
+
+ private static final int FIND_REPEAT_COUNT = 10;
+
+ private int port = MULTICAST_PORT;
+ private String address = MULTICAST_ADDRESS;
+ private String bind = null;
+ private int timeout = 50;
+ private String receiverAddress;
+ private int receiverPort = 4000;
+ private int receiverAutoBind = 100;
+ private List<URI> staticRoutes;
+
+ private ReplicatedMap map;
+
+ private String id;
+ private boolean noMultiCast;
+
+ private static final GroupChannel createChannel(String address, int port, String bindAddress) {
+
+ //create a channel
+ GroupChannel channel = new GroupChannel();
+ McastService mcastService = (McastService)channel.getMembershipService();
+ mcastService.setPort(port);
+ mcastService.setAddress(address);
+
+ // REVIEW: In my case, there are multiple IP addresses
+ // One for the WIFI and the other one for VPN. For some reason the VPN one doesn't support
+ // Multicast
+
+ if (bindAddress != null) {
+ mcastService.setBind(bindAddress);
+ } else {
+ mcastService.setBind(getBindAddress());
+ }
+
+ return channel;
+ }
+
+ public ReplicatedEndpointRegistry(ExtensionPointRegistry registry,
+ Map<String, String> attributes,
+ String domainRegistryURI,
+ String domainURI) {
+ super(registry, attributes, domainRegistryURI, domainURI);
+ getParameters(attributes, domainRegistryURI);
+ }
+
+ private Map<String, String> getParameters(Map<String, String> attributes, String domainRegistryURI) {
+ Map<String, String> map = new HashMap<String, String>();
+ if (attributes != null) {
+ map.putAll(attributes);
+ }
+ URI uri = URI.create(domainRegistryURI);
+ if (uri.getHost() != null) {
+ map.put("address", uri.getHost());
+ }
+ if (uri.getPort() != -1) {
+ map.put("port", String.valueOf(uri.getPort()));
+ }
+
+ if (domainRegistryURI.startsWith("tuscany")) {
+ setTuscanyConfig(map, domainRegistryURI);
+ setConfig(map);
+ return map;
+ }
+
+ int index = domainRegistryURI.indexOf('?');
+ if (index == -1) {
+ setConfig(map);
+ return map;
+ }
+ String query = domainRegistryURI.substring(index + 1);
+ try {
+ query = URLDecoder.decode(query, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalArgumentException(e);
+ }
+ String[] params = query.split("&");
+ for (String param : params) {
+ index = param.indexOf('=');
+ if (index != -1) {
+ map.put(param.substring(0, index), param.substring(index + 1));
+ }
+ }
+ setConfig(map);
+ return map;
+ }
+
+ private void setTuscanyConfig(Map<String, String> map, String domainRegistryURI) {
+ DomainRegistryURI tuscanyURI = new DomainRegistryURI(domainRegistryURI);
+ map.put("address", tuscanyURI.getMulticastAddress());
+ map.put("port", Integer.toString(tuscanyURI.getMulticastPort()));
+ map.put("bind", tuscanyURI.getBindAddress());
+ map.put("receiverPort", Integer.toString(tuscanyURI.getListenPort()));
+ if (tuscanyURI.isMulticastDisabled()) {
+ map.put("nomcast", "true");
+ }
+ if (tuscanyURI.getRemotes().size() > 0) {
+ String routes = "";
+ for (int i=0; i<tuscanyURI.getRemotes().size(); i++) {
+ routes += tuscanyURI.getRemotes().get(i);
+ if (i < tuscanyURI.getRemotes().size()) {
+ routes += ",";
+ }
+ }
+ map.put("routes", routes);
+ }
+ }
+
+ private void setConfig(Map<String, String> attributes) {
+ String portStr = attributes.get("port");
+ if (portStr != null) {
+ port = Integer.parseInt(portStr);
+ if (port == -1) {
+ port = MULTICAST_PORT;
+ }
+ }
+ String address = attributes.get("address");
+ if (address == null) {
+ address = MULTICAST_ADDRESS;
+ }
+ bind = attributes.get("bind");
+ String timeoutStr = attributes.get("timeout");
+ if (timeoutStr != null) {
+ timeout = Integer.parseInt(timeoutStr);
+ }
+
+ String routesStr = attributes.get("routes");
+ if (routesStr != null) {
+ StringTokenizer st = new StringTokenizer(routesStr);
+ staticRoutes = new ArrayList<URI>();
+ while (st.hasMoreElements()) {
+ staticRoutes.add(URI.create("tcp://" + st.nextToken()));
+ }
+ }
+ String mcast = attributes.get("nomcast");
+ if (mcast != null) {
+ noMultiCast = Boolean.valueOf(mcast);
+ }
+ receiverAddress = attributes.get("receiverAddress");
+ String recvPort = attributes.get("receiverPort");
+ if (recvPort != null) {
+ receiverPort = Integer.parseInt(recvPort);
+ }
+ String recvAutoBind = attributes.get("receiverAutoBind");
+ if (recvAutoBind != null) {
+ receiverAutoBind = Integer.parseInt(recvAutoBind);
+ }
+ }
+
+ public void start() {
+ if (map != null) {
+ throw new IllegalStateException("The registry has already been started");
+ }
+ GroupChannel channel = createChannel(address, port, bind);
+ map =
+ new ReplicatedMap(null, channel, timeout, this.domainURI,
+ new ClassLoader[] {ReplicatedEndpointRegistry.class.getClassLoader()});
+ map.addListener(this);
+
+ if (noMultiCast) {
+ map.getChannel().addInterceptor(new DisableMcastInterceptor());
+ }
+
+ // Configure the receiver ports
+ ChannelReceiver receiver = channel.getChannelReceiver();
+ if (receiver instanceof ReceiverBase) {
+ if (receiverAddress != null) {
+ ((ReceiverBase)receiver).setAddress(receiverAddress);
+ }
+ ((ReceiverBase)receiver).setPort(receiverPort);
+ ((ReceiverBase)receiver).setAutoBind(receiverAutoBind);
+ }
+
+ /*
+ Object sender = channel.getChannelSender();
+ if (sender instanceof ReplicationTransmitter) {
+ sender = ((ReplicationTransmitter)sender).getTransport();
+ }
+ if (sender instanceof AbstractSender) {
+ ((AbstractSender)sender).setKeepAliveCount(0);
+ ((AbstractSender)sender).setMaxRetryAttempts(5);
+ }
+ */
+
+ if (staticRoutes != null) {
+ StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
+ for (URI staticRoute : staticRoutes) {
+ Member member;
+ try {
+ // The port has to match the receiver port
+ member = new StaticMember(staticRoute.getHost(), staticRoute.getPort(), 5000);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ smi.addStaticMember(member);
+ logger.info("Added static route: " + staticRoute.getHost() + ":" + staticRoute.getPort());
+ }
+ smi.setLocalMember(map.getChannel().getLocalMember(false));
+ map.getChannel().addInterceptor(smi);
+ }
+
+ try {
+ map.getChannel().start(Channel.DEFAULT);
+ } catch (ChannelException e) {
+ throw new IllegalStateException(e);
+ }
+
+ }
+
+ public void stop() {
+ if (map != null) {
+ map.removeListener(this);
+ Channel channel = map.getChannel();
+ map.breakdown();
+ try {
+ channel.stop(Channel.DEFAULT);
+ } catch (ChannelException e) {
+ logger.log(Level.WARNING, e.getMessage(), e);
+ }
+ map = null;
+ }
+ }
+
+ public void addEndpoint(Endpoint endpoint) {
+ map.put(endpoint.getURI(), endpoint);
+ logger.info("Add endpoint - " + endpoint);
+ }
+
+ public List<Endpoint> findEndpoint(String uri) {
+ List<Endpoint> foundEndpoints = new ArrayList<Endpoint>();
+
+ // in the failure case we repeat the look up after a short
+ // delay to take account of tribes replication delays
+ int repeat = FIND_REPEAT_COUNT;
+
+ while (repeat > 0) {
+ for (Object v : map.values()) {
+ Endpoint endpoint = (Endpoint)v;
+ // TODO: implement more complete matching
+ logger.fine("Matching against - " + endpoint);
+ if (endpoint.matches(uri)) {
+ MapEntry entry = map.getInternal(endpoint.getURI());
+ // if (!entry.isPrimary()) {
+ ((RuntimeEndpoint)endpoint).bind(registry, this);
+ // }
+ foundEndpoints.add(endpoint);
+ logger.fine("Found endpoint with matching service - " + endpoint);
+ repeat = 0;
+ }
+ // else the service name doesn't match
+ }
+
+ if (foundEndpoints.size() == 0) {
+ // the service name doesn't match any endpoints so wait a little and try
+ // again in case this is caused by tribes synch delays
+ logger.info("Repeating endpoint reference match - " + uri);
+ repeat--;
+ try {
+ Thread.sleep(1000);
+ } catch (Exception ex) {
+ // do nothing
+ repeat = 0;
+ }
+ }
+ }
+
+ return foundEndpoints;
+ }
+
+ private boolean isLocal(MapEntry entry) {
+ return entry.getPrimary().equals(map.getChannel().getLocalMember(false));
+ }
+
+ public Endpoint getEndpoint(String uri) {
+ return (Endpoint)map.get(uri);
+ }
+
+ public List<Endpoint> getEndpoints() {
+ return new ArrayList(map.values());
+ }
+
+ public void removeEndpoint(Endpoint endpoint) {
+ map.remove(endpoint.getURI());
+ logger.info("Remove endpoint - " + endpoint);
+ }
+
+ public void replicate(boolean complete) {
+ map.replicate(complete);
+ }
+
+ public void updateEndpoint(String uri, Endpoint endpoint) {
+ Endpoint oldEndpoint = getEndpoint(uri);
+ if (oldEndpoint == null) {
+ throw new IllegalArgumentException("Endpoint is not found: " + uri);
+ }
+ map.put(endpoint.getURI(), endpoint);
+ }
+
+ public void entryAdded(Object key, Object value) {
+ MapEntry entry = (MapEntry)value;
+ Endpoint newEp = (Endpoint)entry.getValue();
+ if (!isLocal(entry)) {
+ logger.info(id + " Remote endpoint added: " + entry.getValue());
+ }
+ endpointAdded(newEp);
+ }
+
+ public void entryRemoved(Object key, Object value) {
+ MapEntry entry = (MapEntry)value;
+ if (!isLocal(entry)) {
+ logger.info(id + " Remote endpoint removed: " + entry.getValue());
+ }
+ endpointRemoved((Endpoint)entry.getValue());
+ }
+
+ public void entryUpdated(Object key, Object oldValue, Object newValue) {
+ MapEntry oldEntry = (MapEntry)oldValue;
+ MapEntry newEntry = (MapEntry)newValue;
+ if (!isLocal(newEntry)) {
+ logger.info(id + " Remote endpoint updated: " + newEntry.getValue());
+ }
+ Endpoint oldEp = (Endpoint)oldEntry.getValue();
+ Endpoint newEp = (Endpoint)newEntry.getValue();
+ endpointUpdated(oldEp, newEp);
+ }
+
+ private static String getBindAddress() {
+ try {
+ Enumeration<NetworkInterface> nis = NetworkInterface.getNetworkInterfaces();
+ while (nis.hasMoreElements()) {
+ NetworkInterface ni = nis.nextElement();
+ // The following APIs require JDK 1.6
+ /*
+ if (ni.isLoopback() || !ni.isUp() || !ni.supportsMulticast()) {
+ continue;
+ }
+ */
+ Enumeration<InetAddress> ips = ni.getInetAddresses();
+ if (!ips.hasMoreElements()) {
+ continue;
+ }
+ while (ips.hasMoreElements()) {
+ InetAddress addr = ips.nextElement();
+ if (addr.isLoopbackAddress()) {
+ continue;
+ }
+ return addr.getHostAddress();
+ }
+ }
+ return InetAddress.getLocalHost().getHostAddress();
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, e.getMessage(), e);
+ return null;
+ }
+ }
+
+}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
new file mode 100644
index 0000000000..669ad82192
--- /dev/null
+++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tuscany.sca.endpoint.tribes;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.catalina.tribes.group.RpcCallback;
+
+/**
+ * This file is copied from:
+ * https://svn.apache.org/repos/asf/tomcat/tc6.0.x/tags/TOMCAT_6_0_20/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
+ *
+ */
+public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, ChannelListener, MembershipListener {
+ private static final long serialVersionUID = -6318779627600581121L;
+ protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ReplicatedMap.class);
+
+ //------------------------------------------------------------------------------
+ // CONSTRUCTORS / DESTRUCTORS
+ //------------------------------------------------------------------------------
+ /**
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ * @param initialCapacity int - the size of this map, see HashMap
+ * @param loadFactor float - load factor, see HashMap
+ */
+ public ReplicatedMap(MapOwner owner,
+ Channel channel,
+ long timeout,
+ String mapContextName,
+ int initialCapacity,
+ float loadFactor,
+ ClassLoader[] cls) {
+ super(owner, channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls);
+ }
+
+ /**
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ * @param initialCapacity int - the size of this map, see HashMap
+ */
+ public ReplicatedMap(MapOwner owner,
+ Channel channel,
+ long timeout,
+ String mapContextName,
+ int initialCapacity,
+ ClassLoader[] cls) {
+ super(owner, channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,
+ Channel.SEND_OPTIONS_DEFAULT, cls);
+ }
+
+ /**
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ */
+ public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) {
+ super(owner, channel, timeout, mapContextName, AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY,
+ AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls);
+ }
+
+ //------------------------------------------------------------------------------
+ // METHODS TO OVERRIDE
+ //------------------------------------------------------------------------------
+ protected int getStateMessageType() {
+ return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY;
+ }
+
+ /**
+ * publish info about a map pair (key/value) to other nodes in the cluster
+ * @param key Object
+ * @param value Object
+ * @return Member - the backup node
+ * @throws ChannelException
+ */
+ protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
+ if (!(key instanceof Serializable && value instanceof Serializable))
+ return new Member[0];
+ //select a backup node
+ Member[] members = getMapMembers();
+
+ if (members == null || members.length == 0) {
+ return new Member[0];
+ }
+
+ //publish the data out to all nodes
+ MapMessage msg =
+ new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value,
+ null, channel.getLocalMember(false), members);
+
+ getChannel().send(members, msg, getChannelSendOptions());
+
+ return members;
+ }
+
+ /**
+ * Override the base method to look up existing entries only
+ */
+ public Object get(Object key) {
+ MapEntry entry = super.getInternal(key);
+ if (log.isTraceEnabled())
+ log.trace("Requesting id:" + key + " entry:" + entry);
+ if (entry == null) {
+ return null;
+ }
+ return entry.getValue();
+ }
+
+ /**
+ * Override the base method to remove all entries owned by the member that disappeared
+ */
+ public void memberDisappeared(Member member) {
+ boolean removed = false;
+ synchronized (mapMembers) {
+ removed = (mapMembers.remove(member) != null);
+ if (!removed) {
+ if (log.isDebugEnabled())
+ log.debug("Member[" + member + "] disappeared, but was not present in the map.");
+ return; //the member was not part of our map.
+ }
+ }
+
+ Iterator<Map.Entry<Object, Object>> i = super.entrySetFull().iterator();
+ while (i.hasNext()) {
+ Map.Entry<Object, Object> e = i.next();
+ MapEntry entry = (MapEntry)super.getInternal(e.getKey());
+ if (entry == null) {
+ continue;
+ }
+ if (member.equals(entry.getPrimary())) {
+ if (log.isDebugEnabled())
+ log.debug("[2] Primary disappeared");
+ i.remove();
+ } //end if
+ } //while
+ }
+}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java
new file mode 100644
index 0000000000..91314b4c37
--- /dev/null
+++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.tribes;
+
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.runtime.BaseDomainRegistryFactory;
+import org.apache.tuscany.sca.runtime.EndpointRegistry;
+
+/**
+ * The utility responsible for finding the endpoint regstry by the scheme and creating instances for the
+ * given domain
+ */
+public class TribesDomainRegistryFactory extends BaseDomainRegistryFactory {
+ private final static String[] schemes = new String[] {"multicast", "tribes", "tuscany"};
+
+ /**
+ * @param extensionRegistry
+ */
+ public TribesDomainRegistryFactory(ExtensionPointRegistry registry) {
+ super(registry);
+ }
+
+ protected EndpointRegistry createEndpointRegistry(String endpointRegistryURI, String domainURI) {
+ EndpointRegistry endpointRegistry =
+ new ReplicatedEndpointRegistry(registry, null, endpointRegistryURI, domainURI);
+ return endpointRegistry;
+ }
+
+ public String[] getSupportedSchemes() {
+ return schemes;
+ }
+}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory
new file mode 100644
index 0000000000..734afb2ac5
--- /dev/null
+++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+org.apache.tuscany.sca.endpoint.tribes.TribesDomainRegistryFactory;ranking=50 \ No newline at end of file
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java
new file mode 100644
index 0000000000..c28a303070
--- /dev/null
+++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.tribes;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.tuscany.sca.assembly.AssemblyFactory;
+import org.apache.tuscany.sca.assembly.Binding;
+import org.apache.tuscany.sca.assembly.Component;
+import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.assembly.SCABindingFactory;
+import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.runtime.EndpointListener;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+// Ignore so its not run in the build yet till its working
+@Ignore("TUSCANY-3718")
+public class MultiRegTestCase implements EndpointListener {
+ private static ExtensionPointRegistry extensionPoints;
+ private static AssemblyFactory assemblyFactory;
+ private static SCABindingFactory scaBindingFactory;
+
+ @BeforeClass
+ public static void init() {
+ extensionPoints = new DefaultExtensionPointRegistry();
+ FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
+ assemblyFactory = factories.getFactory(AssemblyFactory.class);
+ scaBindingFactory = factories.getFactory(SCABindingFactory.class);
+ }
+
+ @Test
+ public void testReplication() throws Exception {
+ RuntimeEndpoint ep1 = createEndpoint("ep1uri");
+
+ // String host = InetAddress.getLocalHost().getHostAddress();
+ String bind = "127.0.0.1"; // "9.65.158.31";
+ String port1 = "8085";
+ String port2 = "8086";
+ String port3 = "8087";
+ String range = "1";
+
+ Map<String, String> attrs1 = new HashMap<String, String>();
+ // attrs1.put("nomcast", "true");
+ attrs1.put("bind", bind);
+ attrs1.put("receiverPort", port1);
+ attrs1.put("receiverAutoBind", range);
+ // attrs1.put("routes", host + ":" + port2 + " " + host + ":" + port3);
+ ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, attrs1, "foo", "bar");
+ reg1.addListener(this);
+ reg1.start();
+
+ Map<String, String> attrs2 = new HashMap<String, String>();
+ // attrs2.put("nomcast", "true");
+ attrs2.put("bind", bind);
+ attrs2.put("receiverPort", port2);
+ attrs2.put("receiverAutoBind", range);
+ // attrs2.put("routes", host + ":"+port1);
+ ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, attrs2, "foo", "bar");
+ reg2.addListener(this);
+ reg2.start();
+
+ Map<String, String> attrs3 = new HashMap<String, String>();
+ // attrs3.put("nomcast", "true");
+ attrs3.put("bind", bind);
+ attrs3.put("receiverPort", port3);
+ attrs3.put("receiverAutoBind", range);
+ // attrs3.put("routes", host + ":"+port1);
+ ReplicatedEndpointRegistry reg3 = new ReplicatedEndpointRegistry(extensionPoints, attrs3, "foo", "bar");
+ reg3.addListener(this);
+ reg3.start();
+
+ ep1.bind(extensionPoints, reg1);
+ reg1.addEndpoint(ep1);
+ assertExists(reg1, "ep1uri");
+ assertExists(reg2, "ep1uri");
+ assertExists(reg3, "ep1uri");
+
+ RuntimeEndpoint ep2 = createEndpoint("ep2uri");
+ ep2.bind(extensionPoints, reg2);
+ reg2.addEndpoint(ep2);
+ assertExists(reg2, "ep2uri");
+ assertExists(reg1, "ep2uri");
+ assertExists(reg3, "ep2uri");
+
+ reg1.stop();
+ Thread.sleep(6000);
+ Assert.assertNull(reg2.getEndpoint("ep1uri"));
+ Assert.assertNull(reg3.getEndpoint("ep1uri"));
+ assertExists(reg2, "ep2uri");
+ assertExists(reg3, "ep2uri");
+
+ reg1.start();
+ ep1.bind(extensionPoints, reg1);
+ reg1.addEndpoint(ep1);
+ assertExists(reg1, "ep1uri");
+ assertExists(reg2, "ep1uri");
+ assertExists(reg3, "ep1uri");
+
+ reg1.stop();
+ reg2.stop();
+ reg3.stop();
+ System.out.println(); // closed
+ }
+
+ private Endpoint assertExists(ReplicatedEndpointRegistry reg, String uri) throws InterruptedException {
+ Endpoint ep = null;
+ int count = 0;
+ while (ep == null && count < 15) {
+ ep = reg.getEndpoint(uri);
+ Thread.sleep(1000);
+ count++;
+ System.out.println(reg + ": tries=" + count);
+ }
+ Assert.assertNotNull(ep);
+ Assert.assertEquals(uri, ep.getURI());
+ return ep;
+ }
+
+ private RuntimeEndpoint createEndpoint(String uri) {
+ RuntimeEndpoint ep = (RuntimeEndpoint) assemblyFactory.createEndpoint();
+ Component comp = assemblyFactory.createComponent();
+ ep.setComponent(comp);
+ ep.setService(assemblyFactory.createComponentService());
+ Binding b = scaBindingFactory.createSCABinding();
+ ep.setBinding(b);
+ ep.setURI(uri);
+ return ep;
+ }
+
+ private void print(String prefix, Endpoint ep) {
+ System.out.println(prefix + ": "+ep);
+ }
+
+ public void endpointAdded(Endpoint endpoint) {
+ print("Added", endpoint);
+ }
+
+ public void endpointRemoved(Endpoint endpoint) {
+ print("Removed", endpoint);
+ }
+
+ public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) {
+ print("Updated", newEndpoint);
+ }
+
+}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java
new file mode 100644
index 0000000000..99c5472c82
--- /dev/null
+++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.tribes;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.tuscany.sca.assembly.AssemblyFactory;
+import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("TUSCANY-3718")
+public class ReplicatedEndpointRegistryTestCase {
+
+ @Test
+ // @Ignore("Ignore this test case for now as it might be sensitive to the multicast settings for a multi-homed machine")
+ public void testReplicate() throws InterruptedException {
+ DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry();
+ FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
+ AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class);
+
+ Map<String, String> attrs = new HashMap<String, String>();
+ attrs.put("bind", "127.0.0.1");
+ ReplicatedEndpointRegistry ep1 = new ReplicatedEndpointRegistry(extensionPoints, attrs, "foo", "bar");
+ System.out.println("ep1 is: " + ep1);
+ ep1.start();
+
+ Endpoint e1 = assemblyFactory.createEndpoint();
+ e1.setURI("e1uri");
+ ((RuntimeEndpoint) e1).bind(extensionPoints, ep1);
+ ep1.addEndpoint(e1);
+
+ Endpoint e1p = ep1.getEndpoint("e1uri");
+ System.out.println("EP1 in Registry 1: " + e1p);
+ Assert.assertNotNull(e1p);
+
+ ReplicatedEndpointRegistry ep2 = new ReplicatedEndpointRegistry(extensionPoints, attrs, "foo", "bar");
+ System.out.println("ep2 is: " + ep2);
+ ep2.start();
+ Thread.sleep(5000);
+
+ Endpoint e1p2 = ep2.getEndpoint("e1uri");
+ System.out.println("EP1 in Registry 2: " + e1p2);
+ Assert.assertNotNull(e1p2);
+
+ ReplicatedEndpointRegistry ep3 = new ReplicatedEndpointRegistry(extensionPoints, attrs, "foo", "bar");
+ System.out.println("ep3 is: " + ep3);
+ ep3.start();
+ Thread.sleep(5000);
+
+ Endpoint e1p3 = ep3.getEndpoint("e1uri");
+ System.out.println("EP1 in Registry 3: " + e1p3);
+ Assert.assertNotNull(e1p3);
+
+ ep1.stop();
+ ep2.stop();
+ ep3.stop();
+ }
+
+ public static void main(String[] args) throws Exception {
+ new ReplicatedEndpointRegistryTestCase().testReplicate();
+ }
+}