From 9860bae75651b7c31cb0a7f4a66bf702712a10db Mon Sep 17 00:00:00 2001 From: rfeng Date: Wed, 24 Jun 2009 22:34:12 +0000 Subject: Allow the replicated map to be listened on the add/remove/update of entries git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@788201 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/endpoint/tribes/AbstractReplicatedMap.java | 1561 ++++++++++++++++++++ .../tuscany/sca/endpoint/tribes/MapStore.java | 83 ++ .../tribes/ReplicatedEndpointRegistry.java | 99 +- .../tuscany/sca/endpoint/tribes/ReplicatedMap.java | 119 ++ 4 files changed, 1849 insertions(+), 13 deletions(-) create mode 100644 java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java create mode 100644 java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java create mode 100644 java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java (limited to 'java/sca/modules/endpoint-tribes/src/main') diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java new file mode 100644 index 0000000000..91aee4d585 --- /dev/null +++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java @@ -0,0 +1,1561 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tuscany.sca.endpoint.tribes; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.Heartbeat; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.group.Response; +import org.apache.catalina.tribes.group.RpcCallback; +import org.apache.catalina.tribes.group.RpcChannel; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.membership.MemberImpl; +import org.apache.catalina.tribes.tipis.ReplicatedMapEntry; +import org.apache.catalina.tribes.util.Arrays; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; + +/** + * This file is copied from: + * https://svn.apache.org/repos/asf/tomcat/tc6.0.x/tags/TOMCAT_6_0_20/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java + * + * We need to intercept the put/remove calls on the base Map to fire events + */ +public abstract class AbstractReplicatedMap extends MapStore implements RpcCallback, ChannelListener, + MembershipListener, Heartbeat { + private static final long serialVersionUID = -2703358020113899074L; + + protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class); + + /** + * The default initial capacity - MUST be a power of two. + */ + public static final int DEFAULT_INITIAL_CAPACITY = 16; + + /** + * The load factor used when none specified in constructor. + **/ + public static final float DEFAULT_LOAD_FACTOR = 0.75f; + + /** + * Used to identify the map + */ + final String chset = "ISO-8859-1"; + + //------------------------------------------------------------------------------ + // INSTANCE VARIABLES + //------------------------------------------------------------------------------ + protected abstract int getStateMessageType(); + + /** + * Timeout for RPC messages, how long we will wait for a reply + */ + protected transient long rpcTimeout = 5000; + /** + * Reference to the channel for sending messages + */ + protected transient Channel channel; + /** + * The RpcChannel to send RPC messages through + */ + protected transient RpcChannel rpcChannel; + /** + * The Map context name makes this map unique, this + * allows us to have more than one map shared + * through one channel + */ + protected transient byte[] mapContextName; + /** + * Has the state been transferred + */ + protected transient boolean stateTransferred = false; + /** + * Simple lock object for transfers + */ + protected transient Object stateMutex = new Object(); + /** + * A list of members in our map + */ + protected transient Map mapMembers = new HashMap(); + /** + * Our default send options + */ + protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT; + /** + * The owner of this map, ala a SessionManager for example + */ + protected transient MapOwner mapOwner; + /** + * External class loaders if serialization and deserialization is to be performed successfully. + */ + protected transient ClassLoader[] externalLoaders; + + /** + * The node we are currently backing up data to, this index will rotate + * on a round robin basis + */ + protected transient int currentNode = 0; + + /** + * Since the map keeps internal membership + * this is the timeout for a ping message to be responded to + * If a remote map doesn't respond within this timeframe, + * its considered dead. + */ + protected transient long accessTimeout = 5000; + + /** + * Readable string of the mapContextName value + */ + protected transient String mapname = ""; + + //------------------------------------------------------------------------------ + // map owner interface + //------------------------------------------------------------------------------ + + public static interface MapOwner { + public void objectMadePrimay(Object key, Object value); + } + + //------------------------------------------------------------------------------ + // CONSTRUCTORS + //------------------------------------------------------------------------------ + + /** + * Creates a new map + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messags + * @param mapContextName String - unique name for this map, to allow multiple maps per channel + * @param initialCapacity int - the size of this map, see HashMap + * @param loadFactor float - load factor, see HashMap + * @param cls - a list of classloaders to be used for deserialization of objects. + */ + public AbstractReplicatedMap(MapOwner owner, + Channel channel, + long timeout, + String mapContextName, + int initialCapacity, + float loadFactor, + int channelSendOptions, + ClassLoader[] cls) { + super(initialCapacity, loadFactor, 15); + init(owner, channel, mapContextName, timeout, channelSendOptions, cls); + + } + + /** + * Helper methods, wraps a single member in an array + * @param m Member + * @return Member[] + */ + protected Member[] wrap(Member m) { + if (m == null) + return new Member[0]; + else + return new Member[] {m}; + } + + /** + * Initializes the map by creating the RPC channel, registering itself as a channel listener + * This method is also responsible for initiating the state transfer + * @param owner Object + * @param channel Channel + * @param mapContextName String + * @param timeout long + * @param channelSendOptions int + * @param cls ClassLoader[] + */ + protected void init(MapOwner owner, + Channel channel, + String mapContextName, + long timeout, + int channelSendOptions, + ClassLoader[] cls) { + log.info("Initializing AbstractReplicatedMap with context name:" + mapContextName); + this.mapOwner = owner; + this.externalLoaders = cls; + this.channelSendOptions = channelSendOptions; + this.channel = channel; + this.rpcTimeout = timeout; + + try { + this.mapname = mapContextName; + //unique context is more efficient if it is stored as bytes + this.mapContextName = mapContextName.getBytes(chset); + } catch (UnsupportedEncodingException x) { + log.warn("Unable to encode mapContextName[" + mapContextName + + "] using getBytes(" + + chset + + ") using default getBytes()", x); + this.mapContextName = mapContextName.getBytes(); + } + if (log.isTraceEnabled()) + log.trace("Created Lazy Map with name:" + mapContextName + + ", bytes:" + + Arrays.toString(this.mapContextName)); + + //create an rpc channel and add the map as a listener + this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); + //add this map as a message listener + this.channel.addChannelListener(this); + //listen for membership notifications + this.channel.addMembershipListener(this); + + try { + //broadcast our map, this just notifies other members of our existence + broadcast(MapMessage.MSG_INIT, true); + //transfer state from another map + transferState(); + //state is transferred, we are ready for messaging + broadcast(MapMessage.MSG_START, true); + } catch (ChannelException x) { + log.warn("Unable to send map start message."); + throw new RuntimeException("Unable to start replicated map.", x); + } + } + + /** + * Sends a ping out to all the members in the cluster, not just map members + * that this map is alive. + * @param timeout long + * @throws ChannelException + */ + protected void ping(long timeout) throws ChannelException { + //send out a map membership message, only wait for the first reply + MapMessage msg = + new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel + .getLocalMember(false), null); + if (channel.getMembers().length > 0) { + //send a ping, wait for all nodes to reply + Response[] resp = + rpcChannel.send(channel.getMembers(), + msg, + rpcChannel.ALL_REPLY, + (channelSendOptions), + (int)accessTimeout); + for (int i = 0; i < resp.length; i++) { + memberAlive(resp[i].getSource()); + } //for + } + //update our map of members, expire some if we didn't receive a ping back + synchronized (mapMembers) { + Iterator it = mapMembers.entrySet().iterator(); + long now = System.currentTimeMillis(); + while (it.hasNext()) { + Map.Entry entry = (Map.Entry)it.next(); + long access = ((Long)entry.getValue()).longValue(); + if ((now - access) > timeout) { + it.remove(); + memberDisappeared((Member)entry.getKey()); + } + } + }//synch + } + + /** + * We have received a member alive notification + * @param member Member + */ + protected void memberAlive(Member member) { + synchronized (mapMembers) { + if (!mapMembers.containsKey(member)) { + mapMemberAdded(member); + } //end if + mapMembers.put(member, new Long(System.currentTimeMillis())); + } + } + + /** + * Helper method to broadcast a message to all members in a channel + * @param msgtype int + * @param rpc boolean + * @throws ChannelException + */ + protected void broadcast(int msgtype, boolean rpc) throws ChannelException { + //send out a map membership message, only wait for the first reply + MapMessage msg = + new MapMessage(this.mapContextName, msgtype, false, null, null, null, channel.getLocalMember(false), null); + if (rpc) { + Response[] resp = + rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout); + for (int i = 0; i < resp.length; i++) { + mapMemberAdded(resp[i].getSource()); + messageReceived(resp[i].getMessage(), resp[i].getSource()); + } + } else { + channel.send(channel.getMembers(), msg, channelSendOptions); + } + } + + public void breakdown() { + finalize(); + } + + public void finalize() { + try { + broadcast(MapMessage.MSG_STOP, false); + } catch (Exception ignore) { + } + //cleanup + if (this.rpcChannel != null) { + this.rpcChannel.breakdown(); + } + if (this.channel != null) { + this.channel.removeChannelListener(this); + this.channel.removeMembershipListener(this); + } + this.rpcChannel = null; + this.channel = null; + this.mapMembers.clear(); + super.clear(); + this.stateTransferred = false; + this.externalLoaders = null; + } + + public int hashCode() { + return Arrays.hashCode(this.mapContextName); + } + + public boolean equals(Object o) { + if (o == null) + return false; + if (!(o instanceof AbstractReplicatedMap)) + return false; + if (!(o.getClass().equals(this.getClass()))) + return false; + AbstractReplicatedMap other = (AbstractReplicatedMap)o; + return Arrays.equals(mapContextName, other.mapContextName); + } + + //------------------------------------------------------------------------------ + // GROUP COM INTERFACES + //------------------------------------------------------------------------------ + public Member[] getMapMembers(Map members) { + synchronized (members) { + Member[] result = new Member[members.size()]; + members.keySet().toArray(result); + return result; + } + } + + public Member[] getMapMembers() { + return getMapMembers(this.mapMembers); + } + + public Member[] getMapMembersExcl(Member[] exclude) { + synchronized (mapMembers) { + Map list = (Map)((HashMap)mapMembers).clone(); + for (int i = 0; i < exclude.length; i++) + list.remove(exclude[i]); + return getMapMembers(list); + } + } + + /** + * Replicates any changes to the object since the last time + * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated
+ * @param complete - if set to true, the object is replicated to its backup + * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will + * be replicated + */ + public void replicate(Object key, boolean complete) { + if (log.isTraceEnabled()) + log.trace("Replicate invoked on key:" + key); + MapEntry entry = (MapEntry)super.get(key); + if (entry == null) + return; + if (!entry.isSerializable()) + return; + if (entry != null && entry.isPrimary() && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) { + Object value = entry.getValue(); + //check to see if we need to replicate this object isDirty()||complete + boolean repl = complete || ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDirty()); + + if (!repl) { + if (log.isTraceEnabled()) + log.trace("Not replicating:" + key + ", no change made"); + + return; + } + //check to see if the message is diffable + boolean diff = ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable()); + MapMessage msg = null; + if (diff) { + ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue(); + try { + rentry.lock(); + //construct a diff message + msg = + new MapMessage(mapContextName, MapMessage.MSG_BACKUP, true, (Serializable)entry.getKey(), null, + rentry.getDiff(), entry.getPrimary(), entry.getBackupNodes()); + } catch (IOException x) { + log.error("Unable to diff object. Will replicate the entire object instead.", x); + } finally { + rentry.unlock(); + } + + } + if (msg == null) { + //construct a complete + msg = + new MapMessage(mapContextName, MapMessage.MSG_BACKUP, false, (Serializable)entry.getKey(), + (Serializable)entry.getValue(), null, entry.getPrimary(), entry.getBackupNodes()); + + } + try { + if (channel != null && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) { + channel.send(entry.getBackupNodes(), msg, channelSendOptions); + } + } catch (ChannelException x) { + log.error("Unable to replicate data.", x); + } + } //end if + + } + + /** + * This can be invoked by a periodic thread to replicate out any changes. + * For maps that don't store objects that implement ReplicatedMapEntry, this + * method should be used infrequently to avoid large amounts of data transfer + * @param complete boolean + */ + public void replicate(boolean complete) { + Iterator> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = i.next(); + replicate(e.getKey(), complete); + } //while + + } + + public void transferState() { + try { + Member[] members = getMapMembers(); + Member backup = members.length > 0 ? (Member)members[0] : null; + if (backup != null) { + MapMessage msg = + new MapMessage(mapContextName, getStateMessageType(), false, null, null, null, null, null); + Response[] resp = + rpcChannel.send(new Member[] {backup}, msg, RpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout); + if (resp.length > 0) { + synchronized (stateMutex) { + msg = (MapMessage)resp[0].getMessage(); + msg.deserialize(getExternalLoaders()); + List list = (List)msg.getValue(); + for (int i = 0; i < list.size(); i++) { + messageReceived((Serializable)list.get(i), resp[0].getSource()); + } //for + } + } else { + log.warn("Transfer state, 0 replies, probably a timeout."); + } + } + } catch (ChannelException x) { + log.error("Unable to transfer LazyReplicatedMap state.", x); + } catch (IOException x) { + log.error("Unable to transfer LazyReplicatedMap state.", x); + } catch (ClassNotFoundException x) { + log.error("Unable to transfer LazyReplicatedMap state.", x); + } + stateTransferred = true; + } + + /** + * @todo implement state transfer + * @param msg Serializable + * @return Serializable - null if no reply should be sent + */ + public Serializable replyRequest(Serializable msg, final Member sender) { + if (!(msg instanceof MapMessage)) + return null; + MapMessage mapmsg = (MapMessage)msg; + + //map init request + if (mapmsg.getMsgType() == MapMessage.MSG_INIT) { + mapmsg.setPrimary(channel.getLocalMember(false)); + return mapmsg; + } + + //map start request + if (mapmsg.getMsgType() == MapMessage.MSG_START) { + mapmsg.setPrimary(channel.getLocalMember(false)); + mapMemberAdded(sender); + return mapmsg; + } + + //backup request + if (mapmsg.getMsgType() == MapMessage.MSG_RETRIEVE_BACKUP) { + MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); + if (entry == null || (!entry.isSerializable())) + return null; + mapmsg.setValue((Serializable)entry.getValue()); + return mapmsg; + } + + //state transfer request + if (mapmsg.getMsgType() == MapMessage.MSG_STATE || mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY) { + synchronized (stateMutex) { //make sure we dont do two things at the same time + ArrayList list = new ArrayList(); + Iterator> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = i.next(); + MapEntry entry = (MapEntry)super.get(e.getKey()); + if (entry != null && entry.isSerializable()) { + boolean copy = (mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY); + MapMessage me = + new MapMessage(mapContextName, copy ? MapMessage.MSG_COPY : MapMessage.MSG_PROXY, false, + (Serializable)entry.getKey(), copy ? (Serializable)entry.getValue() : null, + null, entry.getPrimary(), entry.getBackupNodes()); + list.add(me); + } + } + mapmsg.setValue(list); + return mapmsg; + + } //synchronized + } + + return null; + + } + + /** + * If the reply has already been sent to the requesting thread, + * the rpc callback can handle any data that comes in after the fact. + * @param msg Serializable + * @param sender Member + */ + public void leftOver(Serializable msg, Member sender) { + //left over membership messages + if (!(msg instanceof MapMessage)) + return; + + MapMessage mapmsg = (MapMessage)msg; + try { + mapmsg.deserialize(getExternalLoaders()); + if (mapmsg.getMsgType() == MapMessage.MSG_START) { + mapMemberAdded(mapmsg.getPrimary()); + } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) { + memberAlive(mapmsg.getPrimary()); + } + } catch (IOException x) { + log.error("Unable to deserialize MapMessage.", x); + } catch (ClassNotFoundException x) { + log.error("Unable to deserialize MapMessage.", x); + } + } + + public void messageReceived(Serializable msg, Member sender) { + if (!(msg instanceof MapMessage)) + return; + + MapMessage mapmsg = (MapMessage)msg; + if (log.isTraceEnabled()) { + log.trace("Map[" + mapname + "] received message:" + mapmsg); + } + + try { + mapmsg.deserialize(getExternalLoaders()); + } catch (IOException x) { + log.error("Unable to deserialize MapMessage.", x); + return; + } catch (ClassNotFoundException x) { + log.error("Unable to deserialize MapMessage.", x); + return; + } + if (log.isTraceEnabled()) + log.trace("Map message received from:" + sender.getName() + " msg:" + mapmsg); + if (mapmsg.getMsgType() == MapMessage.MSG_START) { + mapMemberAdded(mapmsg.getPrimary()); + } + + if (mapmsg.getMsgType() == MapMessage.MSG_STOP) { + memberDisappeared(mapmsg.getPrimary()); + } + + if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) { + MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); + if (entry == null) { + entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue()); + entry.setBackup(false); + entry.setProxy(true); + entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); + super.put(entry.getKey(), entry); + } else { + entry.setProxy(true); + entry.setBackup(false); + entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); + } + } + + if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) { + super.remove(mapmsg.getKey()); + } + + if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP || mapmsg.getMsgType() == MapMessage.MSG_COPY) { + MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); + if (entry == null) { + entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue()); + entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); + entry.setProxy(false); + entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); + if (mapmsg.getValue() != null && mapmsg.getValue() instanceof ReplicatedMapEntry) { + ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner()); + } + } else { + entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); + entry.setProxy(false); + entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); + if (entry.getValue() instanceof ReplicatedMapEntry) { + ReplicatedMapEntry diff = (ReplicatedMapEntry)entry.getValue(); + if (mapmsg.isDiff()) { + try { + diff.lock(); + diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length); + } catch (Exception x) { + log.error("Unable to apply diff to key:" + entry.getKey(), x); + } finally { + diff.unlock(); + } + } else { + if (mapmsg.getValue() != null) + entry.setValue(mapmsg.getValue()); + ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner()); + } //end if + } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) { + ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue(); + re.setOwner(getMapOwner()); + entry.setValue(re); + } else { + if (mapmsg.getValue() != null) + entry.setValue(mapmsg.getValue()); + } //end if + } //end if + super.put(entry.getKey(), entry); + } //end if + } + + public boolean accept(Serializable msg, Member sender) { + boolean result = false; + if (msg instanceof MapMessage) { + if (log.isTraceEnabled()) + log.trace("Map[" + mapname + "] accepting...." + msg); + result = Arrays.equals(mapContextName, ((MapMessage)msg).getMapId()); + if (log.isTraceEnabled()) + log.trace("Msg[" + mapname + "] accepted[" + result + "]...." + msg); + } + return result; + } + + public void mapMemberAdded(Member member) { + if (member.equals(getChannel().getLocalMember(false))) + return; + boolean memberAdded = false; + //select a backup node if we don't have one + synchronized (mapMembers) { + if (!mapMembers.containsKey(member)) { + mapMembers.put(member, new Long(System.currentTimeMillis())); + memberAdded = true; + } + } + if (memberAdded) { + synchronized (stateMutex) { + Iterator> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = i.next(); + MapEntry entry = (MapEntry)super.get(e.getKey()); + if (entry == null) + continue; + if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { + try { + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); + entry.setBackupNodes(backup); + entry.setPrimary(channel.getLocalMember(false)); + } catch (ChannelException x) { + log.error("Unable to select backup node.", x); + } //catch + } //end if + } //while + } //synchronized + }//end if + } + + public boolean inSet(Member m, Member[] set) { + if (set == null) + return false; + boolean result = false; + for (int i = 0; i < set.length && (!result); i++) + if (m.equals(set[i])) + result = true; + return result; + } + + public Member[] excludeFromSet(Member[] mbrs, Member[] set) { + List result = new ArrayList(); + for (int i = 0; i < set.length; i++) { + boolean include = true; + for (int j = 0; j < mbrs.length; j++) + if (mbrs[j].equals(set[i])) + include = false; + if (include) + result.add(set[i]); + } + return (Member[])result.toArray(new Member[result.size()]); + } + + public void memberAdded(Member member) { + //do nothing + } + + public void memberDisappeared(Member member) { + boolean removed = false; + synchronized (mapMembers) { + removed = (mapMembers.remove(member) != null); + if (!removed) { + if (log.isDebugEnabled()) + log.debug("Member[" + member + "] disappeared, but was not present in the map."); + return; //the member was not part of our map. + } + } + + Iterator> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = i.next(); + MapEntry entry = (MapEntry)super.get(e.getKey()); + if (entry == null) + continue; + if (entry.isPrimary() && inSet(member, entry.getBackupNodes())) { + if (log.isDebugEnabled()) + log.debug("[1] Primary choosing a new backup"); + try { + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); + entry.setBackupNodes(backup); + entry.setPrimary(channel.getLocalMember(false)); + } catch (ChannelException x) { + log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); + } + } else if (member.equals(entry.getPrimary())) { + if (log.isDebugEnabled()) + log.debug("[2] Primary disappeared"); + entry.setPrimary(null); + } //end if + + if (entry.isProxy() && entry.getPrimary() == null + && entry.getBackupNodes() != null + && entry.getBackupNodes().length == 1 + && entry.getBackupNodes()[0].equals(member)) { + //remove proxies that have no backup nor primaries + if (log.isDebugEnabled()) + log.debug("[3] Removing orphaned proxy"); + i.remove(); + } else if (entry.getPrimary() == null && entry.isBackup() + && entry.getBackupNodes() != null + && entry.getBackupNodes().length == 1 + && entry.getBackupNodes()[0].equals(channel.getLocalMember(false))) { + try { + if (log.isDebugEnabled()) + log.debug("[4] Backup becoming primary"); + entry.setPrimary(channel.getLocalMember(false)); + entry.setBackup(false); + entry.setProxy(false); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); + entry.setBackupNodes(backup); + if (mapOwner != null) + mapOwner.objectMadePrimay(entry.getKey(), entry.getValue()); + + } catch (ChannelException x) { + log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); + } + } + + } //while + } + + public int getNextBackupIndex() { + int size = mapMembers.size(); + if (mapMembers.size() == 0) + return -1; + int node = currentNode++; + if (node >= size) { + node = 0; + currentNode = 0; + } + return node; + } + + public Member getNextBackupNode() { + Member[] members = getMapMembers(); + int node = getNextBackupIndex(); + if (members.length == 0 || node == -1) + return null; + if (node >= members.length) + node = 0; + return members[node]; + } + + protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException; + + public void heartbeat() { + try { + ping(accessTimeout); + } catch (Exception x) { + log.error("Unable to send AbstractReplicatedMap.ping message", x); + } + } + + //------------------------------------------------------------------------------ + // METHODS TO OVERRIDE + //------------------------------------------------------------------------------ + + /** + * Removes an object from this map, it will also remove it from + * + * @param key Object + * @return Object + */ + public Object remove(Object key) { + return remove(key, true); + } + + public Object remove(Object key, boolean notify) { + MapEntry entry = (MapEntry)super.remove(key); + + try { + if (getMapMembers().length > 0 && notify) { + MapMessage msg = + new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable)key, null, null, + null, null); + getChannel().send(getMapMembers(), msg, getChannelSendOptions()); + } + } catch (ChannelException x) { + log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation", x); + } + return entry != null ? entry.getValue() : null; + } + + public MapEntry getInternal(Object key) { + return (MapEntry)super.get(key); + } + + public Object get(Object key) { + MapEntry entry = (MapEntry)super.get(key); + if (log.isTraceEnabled()) + log.trace("Requesting id:" + key + " entry:" + entry); + if (entry == null) + return null; + if (!entry.isPrimary()) { + //if the message is not primary, we need to retrieve the latest value + try { + Member[] backup = null; + MapMessage msg = null; + if (!entry.isBackup()) { + //make sure we don't retrieve from ourselves + msg = + new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable)key, + null, null, null, null); + Response[] resp = + getRpcChannel().send(entry.getBackupNodes(), + msg, + RpcChannel.FIRST_REPLY, + Channel.SEND_OPTIONS_DEFAULT, + getRpcTimeout()); + if (resp == null || resp.length == 0) { + //no responses + log.warn("Unable to retrieve remote object for key:" + key); + return null; + } + msg = (MapMessage)resp[0].getMessage(); + msg.deserialize(getExternalLoaders()); + backup = entry.getBackupNodes(); + if (entry.getValue() instanceof ReplicatedMapEntry) { + ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); + val.setOwner(getMapOwner()); + } + if (msg.getValue() != null) + entry.setValue(msg.getValue()); + } + if (entry.isBackup()) { + //select a new backup node + backup = publishEntryInfo(key, entry.getValue()); + } else if (entry.isProxy()) { + //invalidate the previous primary + msg = + new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable)key, null, null, + channel.getLocalMember(false), backup); + Member[] dest = getMapMembersExcl(backup); + if (dest != null && dest.length > 0) { + getChannel().send(dest, msg, getChannelSendOptions()); + } + } + entry.setPrimary(channel.getLocalMember(false)); + entry.setBackupNodes(backup); + entry.setBackup(false); + entry.setProxy(false); + + } catch (Exception x) { + log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x); + return null; + } + } + if (log.isTraceEnabled()) + log.trace("Requesting id:" + key + " result:" + entry.getValue()); + if (entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry) { + ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); + //hack, somehow this is not being set above + val.setOwner(getMapOwner()); + + } + return entry.getValue(); + } + + protected void printMap(String header) { + try { + System.out.println("\nDEBUG MAP:" + header); + System.out.println("Map[" + new String(mapContextName, chset) + ", Map Size:" + super.size()); + Member[] mbrs = getMapMembers(); + for (int i = 0; i < mbrs.length; i++) { + System.out.println("Mbr[" + (i + 1) + "=" + mbrs[i].getName()); + } + Iterator i = super.entrySet().iterator(); + int cnt = 0; + + while (i.hasNext()) { + Map.Entry e = (Map.Entry)i.next(); + System.out.println((++cnt) + ". " + super.get(e.getKey())); + } + System.out.println("EndMap]\n\n"); + } catch (Exception ignore) { + ignore.printStackTrace(); + } + } + + /** + * Returns true if the key has an entry in the map. + * The entry can be a proxy or a backup entry, invoking get(key) + * will make this entry primary for the group + * @param key Object + * @return boolean + */ + public boolean containsKey(Object key) { + return super.containsKey(key); + } + + public Object put(Object key, Object value) { + return put(key, value, true); + } + + public Object put(Object key, Object value, boolean notify) { + MapEntry entry = new MapEntry(key, value); + entry.setBackup(false); + entry.setProxy(false); + entry.setPrimary(channel.getLocalMember(false)); + + Object old = null; + + //make sure that any old values get removed + if (containsKey(key)) + old = remove(key); + try { + if (notify) { + Member[] backup = publishEntryInfo(key, value); + entry.setBackupNodes(backup); + } + } catch (ChannelException x) { + log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x); + } + super.put(key, entry); + return old; + } + + /** + * Copies all values from one map to this instance + * @param m Map + */ + public void putAll(Map m) { + Iterator i = m.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry entry = (Map.Entry)i.next(); + put(entry.getKey(), entry.getValue()); + } + } + + public void clear() { + clear(true); + } + + public void clear(boolean notify) { + if (notify) { + //only delete active keys + Iterator keys = keySet().iterator(); + while (keys.hasNext()) + remove(keys.next()); + } else { + super.clear(); + } + } + + public boolean containsValue(Object value) { + if (value == null) { + return super.containsValue(value); + } else { + Iterator> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = i.next(); + MapEntry entry = (MapEntry)super.get(e.getKey()); + if (entry != null && entry.isPrimary() && value.equals(entry.getValue())) + return true; + }//while + return false; + }//end if + } + + public Object clone() { + throw new UnsupportedOperationException("This operation is not valid on a replicated map"); + } + + /** + * Returns the entire contents of the map + * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information + * about the object. + * @return Set + */ + public Set> entrySetFull() { + return super.entrySet(); + } + + public Set keySetFull() { + return super.keySet(); + } + + public int sizeFull() { + return super.size(); + } + + public Set> entrySet() { + Set> set = new LinkedHashSet>(super.size()); + Iterator> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = i.next(); + Object key = e.getKey(); + MapEntry entry = (MapEntry)super.get(key); + if (entry != null && entry.isPrimary()) { + set.add(new MapEntry(key, entry.getValue())); + } + } + return Collections.unmodifiableSet(set); + } + + public Set keySet() { + //todo implement + //should only return keys where this is active. + LinkedHashSet set = new LinkedHashSet(super.size()); + Iterator> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = i.next(); + Object key = e.getKey(); + MapEntry entry = (MapEntry)super.get(key); + if (entry != null && entry.isPrimary()) + set.add(key); + } + return Collections.unmodifiableSet(set); + + } + + public int size() { + //todo, implement a counter variable instead + //only count active members in this node + int counter = 0; + Iterator> it = super.entrySet().iterator(); + while (it != null && it.hasNext()) { + Map.Entry e = it.next(); + if (e != null) { + MapEntry entry = (MapEntry)super.get(e.getKey()); + if (entry != null && entry.isPrimary() && entry.getValue() != null) + counter++; + } + } + return counter; + } + + protected boolean removeEldestEntry(Map.Entry eldest) { + return false; + } + + public boolean isEmpty() { + return size() == 0; + } + + public Collection values() { + ArrayList values = new ArrayList(); + Iterator> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry e = i.next(); + MapEntry entry = (MapEntry)super.get(e.getKey()); + if (entry != null && entry.isPrimary() && entry.getValue() != null) + values.add(entry.getValue()); + } + return Collections.unmodifiableCollection(values); + } + + //------------------------------------------------------------------------------ + // Map Entry class + //------------------------------------------------------------------------------ + public static class MapEntry implements Map.Entry { + private boolean backup; + private boolean proxy; + private Member[] backupNodes; + private Member primary; + private Object key; + private Object value; + + public MapEntry(Object key, Object value) { + setKey(key); + setValue(value); + + } + + public boolean isKeySerializable() { + return (key == null) || (key instanceof Serializable); + } + + public boolean isValueSerializable() { + return (value == null) || (value instanceof Serializable); + } + + public boolean isSerializable() { + return isKeySerializable() && isValueSerializable(); + } + + public boolean isBackup() { + return backup; + } + + public void setBackup(boolean backup) { + this.backup = backup; + } + + public boolean isProxy() { + return proxy; + } + + public boolean isPrimary() { + return ((!proxy) && (!backup)); + } + + public void setProxy(boolean proxy) { + this.proxy = proxy; + } + + public boolean isDiffable() { + return (value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable(); + } + + public void setBackupNodes(Member[] nodes) { + this.backupNodes = nodes; + } + + public Member[] getBackupNodes() { + return backupNodes; + } + + public void setPrimary(Member m) { + primary = m; + } + + public Member getPrimary() { + return primary; + } + + public Object getValue() { + return value; + } + + public Object setValue(Object value) { + Object old = this.value; + this.value = value; + return old; + } + + public Object getKey() { + return key; + } + + public Object setKey(Object key) { + Object old = this.key; + this.key = key; + return old; + } + + public int hashCode() { + return key.hashCode(); + } + + public boolean equals(Object o) { + return key.equals(o); + } + + /** + * apply a diff, or an entire object + * @param data byte[] + * @param offset int + * @param length int + * @param diff boolean + * @throws IOException + * @throws ClassNotFoundException + */ + public void apply(byte[] data, int offset, int length, boolean diff) throws IOException, ClassNotFoundException { + if (isDiffable() && diff) { + ReplicatedMapEntry rentry = (ReplicatedMapEntry)value; + try { + rentry.lock(); + rentry.applyDiff(data, offset, length); + } finally { + rentry.unlock(); + } + } else if (length == 0) { + value = null; + proxy = true; + } else { + value = XByteBuffer.deserialize(data, offset, length); + } + } + + public String toString() { + StringBuffer buf = new StringBuffer("MapEntry[key:"); + buf.append(getKey()).append("; "); + buf.append("value:").append(getValue()).append("; "); + buf.append("primary:").append(isPrimary()).append("; "); + buf.append("backup:").append(isBackup()).append("; "); + buf.append("proxy:").append(isProxy()).append(";]"); + return buf.toString(); + } + + } + + //------------------------------------------------------------------------------ + // map message to send to and from other maps + //------------------------------------------------------------------------------ + + public static class MapMessage implements Serializable { + private static final long serialVersionUID = -7847288807489375686L; + public static final int MSG_BACKUP = 1; + public static final int MSG_RETRIEVE_BACKUP = 2; + public static final int MSG_PROXY = 3; + public static final int MSG_REMOVE = 4; + public static final int MSG_STATE = 5; + public static final int MSG_START = 6; + public static final int MSG_STOP = 7; + public static final int MSG_INIT = 8; + public static final int MSG_COPY = 9; + public static final int MSG_STATE_COPY = 10; + + private byte[] mapId; + private int msgtype; + private boolean diff; + private transient Serializable key; + private transient Serializable value; + private byte[] valuedata; + private byte[] keydata; + private byte[] diffvalue; + private Member[] nodes; + private Member primary; + + public String toString() { + StringBuffer buf = new StringBuffer("MapMessage[context="); + buf.append(new String(mapId)); + buf.append("; type="); + buf.append(getTypeDesc()); + buf.append("; key="); + buf.append(key); + buf.append("; value="); + buf.append(value); + return buf.toString(); + } + + public String getTypeDesc() { + switch (msgtype) { + case MSG_BACKUP: + return "MSG_BACKUP"; + case MSG_RETRIEVE_BACKUP: + return "MSG_RETRIEVE_BACKUP"; + case MSG_PROXY: + return "MSG_PROXY"; + case MSG_REMOVE: + return "MSG_REMOVE"; + case MSG_STATE: + return "MSG_STATE"; + case MSG_START: + return "MSG_START"; + case MSG_STOP: + return "MSG_STOP"; + case MSG_INIT: + return "MSG_INIT"; + case MSG_STATE_COPY: + return "MSG_STATE_COPY"; + case MSG_COPY: + return "MSG_COPY"; + default: + return "UNKNOWN"; + } + } + + public MapMessage() { + } + + public MapMessage(byte[] mapId, + int msgtype, + boolean diff, + Serializable key, + Serializable value, + byte[] diffvalue, + Member primary, + Member[] nodes) { + this.mapId = mapId; + this.msgtype = msgtype; + this.diff = diff; + this.key = key; + this.value = value; + this.diffvalue = diffvalue; + this.nodes = nodes; + this.primary = primary; + setValue(value); + setKey(key); + } + + public void deserialize(ClassLoader[] cls) throws IOException, ClassNotFoundException { + key(cls); + value(cls); + } + + public int getMsgType() { + return msgtype; + } + + public boolean isDiff() { + return diff; + } + + public Serializable getKey() { + try { + return key(null); + } catch (Exception x) { + log.error("Deserialization error of the MapMessage.key", x); + return null; + } + } + + public Serializable key(ClassLoader[] cls) throws IOException, ClassNotFoundException { + if (key != null) + return key; + if (keydata == null || keydata.length == 0) + return null; + key = XByteBuffer.deserialize(keydata, 0, keydata.length, cls); + keydata = null; + return key; + } + + public byte[] getKeyData() { + return keydata; + } + + public Serializable getValue() { + try { + return value(null); + } catch (Exception x) { + log.error("Deserialization error of the MapMessage.value", x); + return null; + } + } + + public Serializable value(ClassLoader[] cls) throws IOException, ClassNotFoundException { + if (value != null) + return value; + if (valuedata == null || valuedata.length == 0) + return null; + value = XByteBuffer.deserialize(valuedata, 0, valuedata.length, cls); + valuedata = null; + ; + return value; + } + + public byte[] getValueData() { + return valuedata; + } + + public byte[] getDiffValue() { + return diffvalue; + } + + public Member[] getBackupNodes() { + return nodes; + } + + private void setBackUpNodes(Member[] nodes) { + this.nodes = nodes; + } + + public Member getPrimary() { + return primary; + } + + private void setPrimary(Member m) { + primary = m; + } + + public byte[] getMapId() { + return mapId; + } + + public void setValue(Serializable value) { + try { + if (value != null) + valuedata = XByteBuffer.serialize(value); + this.value = value; + } catch (IOException x) { + throw new RuntimeException(x); + } + } + + public void setKey(Serializable key) { + try { + if (key != null) + keydata = XByteBuffer.serialize(key); + this.key = key; + } catch (IOException x) { + throw new RuntimeException(x); + } + } + + protected Member[] readMembers(ObjectInput in) throws IOException, ClassNotFoundException { + int nodecount = in.readInt(); + Member[] members = new Member[nodecount]; + for (int i = 0; i < members.length; i++) { + byte[] d = new byte[in.readInt()]; + in.read(d); + if (d.length > 0) + members[i] = MemberImpl.getMember(d); + } + return members; + } + + protected void writeMembers(ObjectOutput out, Member[] members) throws IOException { + if (members == null) + members = new Member[0]; + out.writeInt(members.length); + for (int i = 0; i < members.length; i++) { + if (members[i] != null) { + byte[] d = members[i] != null ? ((MemberImpl)members[i]).getData(false) : new byte[0]; + out.writeInt(d.length); + out.write(d); + } + } + } + + /** + * shallow clone + * @return Object + */ + public Object clone() { + MapMessage msg = + new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.primary, + this.nodes); + msg.keydata = this.keydata; + msg.valuedata = this.valuedata; + return msg; + } + } //MapMessage + + public Channel getChannel() { + return channel; + } + + public byte[] getMapContextName() { + return mapContextName; + } + + public RpcChannel getRpcChannel() { + return rpcChannel; + } + + public long getRpcTimeout() { + return rpcTimeout; + } + + public Object getStateMutex() { + return stateMutex; + } + + public boolean isStateTransferred() { + return stateTransferred; + } + + public MapOwner getMapOwner() { + return mapOwner; + } + + public ClassLoader[] getExternalLoaders() { + return externalLoaders; + } + + public int getChannelSendOptions() { + return channelSendOptions; + } + + public long getAccessTimeout() { + return accessTimeout; + } + + public void setMapOwner(MapOwner mapOwner) { + this.mapOwner = mapOwner; + } + + public void setExternalLoaders(ClassLoader[] externalLoaders) { + this.externalLoaders = externalLoaders; + } + + public void setChannelSendOptions(int channelSendOptions) { + this.channelSendOptions = channelSendOptions; + } + + public void setAccessTimeout(long accessTimeout) { + this.accessTimeout = accessTimeout; + } + +} diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java new file mode 100644 index 0000000000..d667f10c50 --- /dev/null +++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.tribes; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * The Map that can fire events on put/remove of entries + */ +public abstract class MapStore extends ConcurrentHashMap { + private static final long serialVersionUID = -2127235547082144368L; + private List listeners = new CopyOnWriteArrayList(); + + protected MapStore(int initialCapacity, float loadFactor, int concurrencyLevel) { + super(initialCapacity, loadFactor, concurrencyLevel); + } + + @Override + public Object put(Object key, Object value) { + Object old = super.put(key, value); + if (old != null) { + for (MapListener listener : listeners) { + listener.entryUpdated(key, old, value); + } + } else { + for (MapListener listener : listeners) { + listener.entryAdded(key, value); + } + + } + return old; + } + + @Override + public Object remove(Object key) { + Object old = super.remove(key); + if (old != null) { + for (MapListener listener : listeners) { + listener.entryRemoved(key, old); + } + } + return old; + } + + public void addListener(MapListener listener) { + listeners.add(listener); + } + + public List getListeners() { + return listeners; + } + + public boolean removeListener(MapListener listener) { + return listeners.remove(listener); + } + + public static interface MapListener { + void entryAdded(Object key, Object value); + + void entryUpdated(Object key, Object oldValue, Object newValue); + + void entryRemoved(Object key, Object value); + } +} diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java index cc16d80b47..4d24c8a35e 100644 --- a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java +++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java @@ -19,9 +19,11 @@ package org.apache.tuscany.sca.endpoint.tribes; +import java.net.InetAddress; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.logging.Logger; @@ -29,19 +31,19 @@ import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.group.GroupChannel; import org.apache.catalina.tribes.membership.McastService; -import org.apache.catalina.tribes.tipis.ReplicatedMap; -import org.apache.catalina.tribes.tipis.AbstractReplicatedMap.MapEntry; import org.apache.tuscany.sca.assembly.Endpoint; import org.apache.tuscany.sca.assembly.EndpointReference; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.endpoint.tribes.AbstractReplicatedMap.MapEntry; +import org.apache.tuscany.sca.endpoint.tribes.MapStore.MapListener; import org.apache.tuscany.sca.runtime.EndpointListener; import org.apache.tuscany.sca.runtime.EndpointRegistry; /** * A replicated EndpointRegistry based on Apache Tomcat Tribes */ -public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleListener { +public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleListener, MapListener { private final static Logger logger = Logger.getLogger(ReplicatedEndpointRegistry.class.getName()); private static final String MULTICAST_ADDRESS = "228.0.0.100"; private static final int MULTICAST_PORT = 50000; @@ -92,7 +94,6 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi mcastService.setBind(bindAddress); } - // mcastService.setBind("192.168.1.100"); return channel; } @@ -126,6 +127,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi map = new ReplicatedMap(null, createChannel(address, port, bind), timeout, this.domainURI, new ClassLoader[] {ReplicatedEndpointRegistry.class.getClassLoader()}); + map.addListener(this); try { map.getChannel().start(Channel.DEFAULT); } catch (ChannelException e) { @@ -135,6 +137,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi public void stop() { if (map != null) { + map.removeListener(this); Channel channel = map.getChannel(); map.breakdown(); try { @@ -151,12 +154,12 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi for (EndpointListener listener : listeners) { listener.endpointAdded(endpoint); } - logger.info("EndpointRegistry: Add endpoint - " + endpoint); + logger.info("Add endpoint - " + endpoint); } public void addEndpointReference(EndpointReference endpointReference) { endpointreferences.add(endpointReference); - logger.info("EndpointRegistry: Add endpoint reference - " + endpointReference); + logger.info("Add endpoint reference - " + endpointReference); } public void addListener(EndpointListener listener) { @@ -210,7 +213,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi public List findEndpoint(EndpointReference endpointReference) { List foundEndpoints = new ArrayList(); - logger.info("EndpointRegistry: Find endpoint for reference - " + endpointReference); + logger.info("Find endpoint for reference - " + endpointReference); if (endpointReference.getReference() != null) { Endpoint targetEndpoint = endpointReference.getTargetEndpoint(); @@ -219,14 +222,14 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi // TODO: implement more complete matching if (matches(targetEndpoint.getURI(), endpoint.getURI())) { MapEntry entry = map.getInternal(endpoint.getURI()); - if (!entry.getPrimary().equals(map.getChannel().getLocalMember(false))) { + if (!isLocal(entry)) { endpoint.setRemote(true); } // if (!entry.isPrimary()) { - endpoint.setExtensionPointRegistry(registry); + endpoint.setExtensionPointRegistry(registry); // } foundEndpoints.add(endpoint); - logger.info("EndpointRegistry: Found endpoint with matching service - " + endpoint); + logger.info("Found endpoint with matching service - " + endpoint); } // else the service name doesn't match } @@ -234,6 +237,10 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi return foundEndpoints; } + private boolean isLocal(MapEntry entry) { + return entry.getPrimary().equals(map.getChannel().getLocalMember(false)); + } + public List findEndpointReference(Endpoint endpoint) { return endpointreferences; } @@ -247,7 +254,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi } public List getEndpoints() { - return new ArrayList(map.values()); + return new ArrayList(map.values()); } public List getListeners() { @@ -259,12 +266,12 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi for (EndpointListener listener : listeners) { listener.endpointRemoved(endpoint); } - logger.info("EndpointRegistry: Remove endpoint - " + endpoint); + logger.info("Remove endpoint - " + endpoint); } public void removeEndpointReference(EndpointReference endpointReference) { endpointreferences.remove(endpointReference); - logger.info("EndpointRegistry: Remove endpoint reference - " + endpointReference); + logger.info("Remove endpoint reference - " + endpointReference); } public void removeListener(EndpointListener listener) { @@ -282,4 +289,70 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi } } + public void entryAdded(Object key, Object value) { + MapEntry entry = (MapEntry)value; + if (!isLocal(entry)) { + logger.info("Remote endpoint added: " + entry.getValue()); + } + Endpoint newEp = (Endpoint)entry.getValue(); + for (EndpointListener listener : listeners) { + listener.endpointAdded(newEp); + } + } + + public void entryRemoved(Object key, Object value) { + MapEntry entry = (MapEntry)value; + if (!isLocal(entry)) { + logger.info("Remote endpoint removed: " + entry.getValue()); + } + Endpoint oldEp = (Endpoint)entry.getValue(); + for (EndpointListener listener : listeners) { + listener.endpointRemoved(oldEp); + } + } + + public void entryUpdated(Object key, Object oldValue, Object newValue) { + MapEntry oldEntry = (MapEntry)oldValue; + MapEntry newEntry = (MapEntry)newValue; + if (!isLocal(newEntry)) { + logger.info("Remote endpoint updated: " + newEntry.getValue()); + } + Endpoint oldEp = (Endpoint)oldEntry.getValue(); + Endpoint newEp = (Endpoint)newEntry.getValue(); + for (EndpointListener listener : listeners) { + listener.endpointUpdated(oldEp, newEp); + } + } + + public static void main(String[] args) throws Exception { + //create a channel + GroupChannel channel = new GroupChannel(); + McastService mcastService = (McastService)channel.getMembershipService(); + mcastService.setPort(MULTICAST_PORT); + mcastService.setAddress(MULTICAST_ADDRESS); + + InetAddress localhost = InetAddress.getLocalHost(); + + // REVIEW: In my case, there are multiple IP addresses + // One for the WIFI and the other one for VPN. For some reason the VPN one doesn't support + // Multicast + // You can use "route add 228.0.0.0 mask 252.0.0.0 192.168.1.100" + // mcastService.setBind("192.168.1.100"); + channel.start(Channel.DEFAULT); + ReplicatedMap map = new ReplicatedMap(null, channel, 50, "01", null); + map.put(UUID.randomUUID().toString(), localhost.getHostAddress()); + for (int i = 0; i < 4; i++) { + Thread.sleep(3000); + System.out.println(localhost + ": " + map.keySet()); + } + for (Object e : map.entrySetFull()) { + Map.Entry en = (Map.Entry)e; + AbstractReplicatedMap.MapEntry entry = (AbstractReplicatedMap.MapEntry)en.getValue(); + System.out.println(entry); + } + map.breakdown(); + channel.stop(Channel.DEFAULT); + } + + } diff --git a/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java new file mode 100644 index 0000000000..af0bb6fb44 --- /dev/null +++ b/java/sca/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tuscany.sca.endpoint.tribes; + +import java.io.Serializable; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.group.RpcCallback; + +/** + * This file is copied from: + * https://svn.apache.org/repos/asf/tomcat/tc6.0.x/tags/TOMCAT_6_0_20/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java + * + */ +public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, ChannelListener, MembershipListener { + private static final long serialVersionUID = -6318779627600581121L; + protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ReplicatedMap.class); + + //------------------------------------------------------------------------------ + // CONSTRUCTORS / DESTRUCTORS + //------------------------------------------------------------------------------ + /** + * Creates a new map + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messags + * @param mapContextName String - unique name for this map, to allow multiple maps per channel + * @param initialCapacity int - the size of this map, see HashMap + * @param loadFactor float - load factor, see HashMap + */ + public ReplicatedMap(MapOwner owner, + Channel channel, + long timeout, + String mapContextName, + int initialCapacity, + float loadFactor, + ClassLoader[] cls) { + super(owner, channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls); + } + + /** + * Creates a new map + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messags + * @param mapContextName String - unique name for this map, to allow multiple maps per channel + * @param initialCapacity int - the size of this map, see HashMap + */ + public ReplicatedMap(MapOwner owner, + Channel channel, + long timeout, + String mapContextName, + int initialCapacity, + ClassLoader[] cls) { + super(owner, channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, + Channel.SEND_OPTIONS_DEFAULT, cls); + } + + /** + * Creates a new map + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messags + * @param mapContextName String - unique name for this map, to allow multiple maps per channel + */ + public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) { + super(owner, channel, timeout, mapContextName, AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, + AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls); + } + + //------------------------------------------------------------------------------ + // METHODS TO OVERRIDE + //------------------------------------------------------------------------------ + protected int getStateMessageType() { + return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY; + } + + /** + * publish info about a map pair (key/value) to other nodes in the cluster + * @param key Object + * @param value Object + * @return Member - the backup node + * @throws ChannelException + */ + protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { + if (!(key instanceof Serializable && value instanceof Serializable)) + return new Member[0]; + //select a backup node + Member[] backup = getMapMembers(); + + if (backup == null || backup.length == 0) + return null; + + //publish the data out to all nodes + MapMessage msg = + new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value, + null, channel.getLocalMember(false), backup); + + getChannel().send(getMapMembers(), msg, getChannelSendOptions()); + + return backup; + } + +} -- cgit v1.2.3