diff options
Diffstat (limited to 'sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java')
6 files changed, 2320 insertions, 0 deletions
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java new file mode 100644 index 0000000000..fe683af025 --- /dev/null +++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java @@ -0,0 +1,1564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tuscany.sca.endpoint.tribes; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.Heartbeat; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.group.Response; +import org.apache.catalina.tribes.group.RpcCallback; +import org.apache.catalina.tribes.group.RpcChannel; +import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.membership.MemberImpl; +import org.apache.catalina.tribes.tipis.ReplicatedMapEntry; +import org.apache.catalina.tribes.util.Arrays; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; + +/** + * This file is copied from: + * https://svn.apache.org/repos/asf/tomcat/tc6.0.x/tags/TOMCAT_6_0_20/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java + * + * We need to intercept the put/remove calls on the base Map to fire events + */ +public abstract class AbstractReplicatedMap extends MapStore implements RpcCallback, ChannelListener, + MembershipListener, Heartbeat { + private static final long serialVersionUID = -2703358020113899074L; + + protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class); + + /** + * The default initial capacity - MUST be a power of two. + */ + public static final int DEFAULT_INITIAL_CAPACITY = 16; + + /** + * The load factor used when none specified in constructor. + **/ + public static final float DEFAULT_LOAD_FACTOR = 0.75f; + + /** + * Used to identify the map + */ + final String chset = "ISO-8859-1"; + + //------------------------------------------------------------------------------ + // INSTANCE VARIABLES + //------------------------------------------------------------------------------ + protected abstract int getStateMessageType(); + + /** + * Timeout for RPC messages, how long we will wait for a reply + */ + protected transient long rpcTimeout = 5000; + /** + * Reference to the channel for sending messages + */ + protected transient Channel channel; + /** + * The RpcChannel to send RPC messages through + */ + protected transient RpcChannel rpcChannel; + /** + * The Map context name makes this map unique, this + * allows us to have more than one map shared + * through one channel + */ + protected transient byte[] mapContextName; + /** + * Has the state been transferred + */ + protected transient boolean stateTransferred = false; + /** + * Simple lock object for transfers + */ + protected transient Object stateMutex = new Object(); + /** + * A list of members in our map + */ + protected transient Map<Member, Long> mapMembers = new HashMap<Member, Long>(); + /** + * Our default send options + */ + protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT; + /** + * The owner of this map, ala a SessionManager for example + */ + protected transient MapOwner mapOwner; + /** + * External class loaders if serialization and deserialization is to be performed successfully. + */ + protected transient ClassLoader[] externalLoaders; + + /** + * The node we are currently backing up data to, this index will rotate + * on a round robin basis + */ + protected transient int currentNode = 0; + + /** + * Since the map keeps internal membership + * this is the timeout for a ping message to be responded to + * If a remote map doesn't respond within this timeframe, + * its considered dead. + */ + protected transient long accessTimeout = 5000; + + /** + * Readable string of the mapContextName value + */ + protected transient String mapname = ""; + + //------------------------------------------------------------------------------ + // map owner interface + //------------------------------------------------------------------------------ + + public static interface MapOwner { + public void objectMadePrimay(Object key, Object value); + } + + //------------------------------------------------------------------------------ + // CONSTRUCTORS + //------------------------------------------------------------------------------ + + /** + * Creates a new map + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messags + * @param mapContextName String - unique name for this map, to allow multiple maps per channel + * @param initialCapacity int - the size of this map, see HashMap + * @param loadFactor float - load factor, see HashMap + * @param cls - a list of classloaders to be used for deserialization of objects. + */ + public AbstractReplicatedMap(MapOwner owner, + Channel channel, + long timeout, + String mapContextName, + int initialCapacity, + float loadFactor, + int channelSendOptions, + ClassLoader[] cls) { + super(initialCapacity, loadFactor, 15); + init(owner, channel, mapContextName, timeout, channelSendOptions, cls); + + } + + /** + * Helper methods, wraps a single member in an array + * @param m Member + * @return Member[] + */ + protected Member[] wrap(Member m) { + if (m == null) + return new Member[0]; + else + return new Member[] {m}; + } + + /** + * Initializes the map by creating the RPC channel, registering itself as a channel listener + * This method is also responsible for initiating the state transfer + * @param owner Object + * @param channel Channel + * @param mapContextName String + * @param timeout long + * @param channelSendOptions int + * @param cls ClassLoader[] + */ + protected void init(MapOwner owner, + Channel channel, + String mapContextName, + long timeout, + int channelSendOptions, + ClassLoader[] cls) { + log.info("Initializing AbstractReplicatedMap with context name:" + mapContextName); + this.mapOwner = owner; + this.externalLoaders = cls; + this.channelSendOptions = channelSendOptions; + this.channel = channel; + this.rpcTimeout = timeout; + + try { + this.mapname = mapContextName; + //unique context is more efficient if it is stored as bytes + this.mapContextName = mapContextName.getBytes(chset); + } catch (UnsupportedEncodingException x) { + log.warn("Unable to encode mapContextName[" + mapContextName + + "] using getBytes(" + + chset + + ") using default getBytes()", x); + this.mapContextName = mapContextName.getBytes(); + } + if (log.isTraceEnabled()) + log.trace("Created Lazy Map with name:" + mapContextName + + ", bytes:" + + Arrays.toString(this.mapContextName)); + + //create an rpc channel and add the map as a listener + this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); + //add this map as a message listener + this.channel.addChannelListener(this); + //listen for membership notifications + this.channel.addMembershipListener(this); + + try { + //broadcast our map, this just notifies other members of our existence + broadcast(MapMessage.MSG_INIT, true); + //transfer state from another map + transferState(); + //state is transferred, we are ready for messaging + broadcast(MapMessage.MSG_START, true); + } catch (ChannelException x) { + log.warn("Unable to send map start message."); + throw new RuntimeException("Unable to start replicated map.", x); + } + } + + /** + * Sends a ping out to all the members in the cluster, not just map members + * that this map is alive. + * @param timeout long + * @throws ChannelException + */ + protected void ping(long timeout) throws ChannelException { + //send out a map membership message, only wait for the first reply + MapMessage msg = + new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel + .getLocalMember(false), null); + if (channel.getMembers().length > 0) { + //send a ping, wait for all nodes to reply + Response[] resp = + rpcChannel.send(channel.getMembers(), + msg, + RpcChannel.ALL_REPLY, + (channelSendOptions), + (int)accessTimeout); + for (int i = 0; i < resp.length; i++) { + memberAlive(resp[i].getSource()); + } //for + } + //update our map of members, expire some if we didn't receive a ping back + synchronized (mapMembers) { + Iterator it = mapMembers.entrySet().iterator(); + long now = System.currentTimeMillis(); + while (it.hasNext()) { + Map.Entry entry = (Map.Entry)it.next(); + long access = ((Long)entry.getValue()).longValue(); + if ((now - access) > timeout) { + it.remove(); + memberDisappeared((Member)entry.getKey()); + } + } + }//synch + } + + /** + * We have received a member alive notification + * @param member Member + */ + protected void memberAlive(Member member) { + synchronized (mapMembers) { + if (!mapMembers.containsKey(member)) { + mapMemberAdded(member); + } //end if + mapMembers.put(member, new Long(System.currentTimeMillis())); + } + } + + /** + * Helper method to broadcast a message to all members in a channel + * @param msgtype int + * @param rpc boolean + * @throws ChannelException + */ + protected void broadcast(int msgtype, boolean rpc) throws ChannelException { + //send out a map membership message, only wait for the first reply + MapMessage msg = + new MapMessage(this.mapContextName, msgtype, false, null, null, null, channel.getLocalMember(false), null); + if (rpc) { + Response[] resp = + rpcChannel.send(channel.getMembers(), msg, RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout); + for (int i = 0; i < resp.length; i++) { + mapMemberAdded(resp[i].getSource()); + messageReceived(resp[i].getMessage(), resp[i].getSource()); + } + } else { + channel.send(channel.getMembers(), msg, channelSendOptions); + } + } + + public void breakdown() { + finalize(); + } + + public void finalize() { + try { + broadcast(MapMessage.MSG_STOP, false); + } catch (Exception ignore) { + } + //cleanup + if (this.rpcChannel != null) { + this.rpcChannel.breakdown(); + } + if (this.channel != null) { + this.channel.removeChannelListener(this); + this.channel.removeMembershipListener(this); + } + this.rpcChannel = null; + this.channel = null; + this.mapMembers.clear(); + super.clear(); + this.stateTransferred = false; + this.externalLoaders = null; + } + + public int hashCode() { + return Arrays.hashCode(this.mapContextName); + } + + public boolean equals(Object o) { + if (o == null) + return false; + if (!(o instanceof AbstractReplicatedMap)) + return false; + if (!(o.getClass().equals(this.getClass()))) + return false; + AbstractReplicatedMap other = (AbstractReplicatedMap)o; + return Arrays.equals(mapContextName, other.mapContextName); + } + + //------------------------------------------------------------------------------ + // GROUP COM INTERFACES + //------------------------------------------------------------------------------ + public Member[] getMapMembers(Map<Member, Long> members) { + synchronized (members) { + Member[] result = new Member[members.size()]; + members.keySet().toArray(result); + return result; + } + } + + public Member[] getMapMembers() { + return getMapMembers(this.mapMembers); + } + + public Member[] getMapMembersExcl(Member[] exclude) { + synchronized (mapMembers) { + Map<Member, Long> list = (Map<Member, Long>)((HashMap<Member, Long>)mapMembers).clone(); + for (int i = 0; i < exclude.length; i++) + list.remove(exclude[i]); + return getMapMembers(list); + } + } + + /** + * Replicates any changes to the object since the last time + * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br> + * @param complete - if set to true, the object is replicated to its backup + * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will + * be replicated + */ + public void replicate(Object key, boolean complete) { + if (log.isTraceEnabled()) + log.trace("Replicate invoked on key:" + key); + MapEntry entry = (MapEntry)super.get(key); + if (entry == null) + return; + if (!entry.isSerializable()) + return; + if (entry != null && entry.isPrimary() && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) { + Object value = entry.getValue(); + //check to see if we need to replicate this object isDirty()||complete + boolean repl = complete || ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDirty()); + + if (!repl) { + if (log.isTraceEnabled()) + log.trace("Not replicating:" + key + ", no change made"); + + return; + } + //check to see if the message is diffable + boolean diff = ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable()); + MapMessage msg = null; + if (diff) { + ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue(); + try { + rentry.lock(); + //construct a diff message + msg = + new MapMessage(mapContextName, MapMessage.MSG_BACKUP, true, (Serializable)entry.getKey(), null, + rentry.getDiff(), entry.getPrimary(), entry.getBackupNodes()); + } catch (IOException x) { + log.error("Unable to diff object. Will replicate the entire object instead.", x); + } finally { + rentry.unlock(); + } + + } + if (msg == null) { + //construct a complete + msg = + new MapMessage(mapContextName, MapMessage.MSG_BACKUP, false, (Serializable)entry.getKey(), + (Serializable)entry.getValue(), null, entry.getPrimary(), entry.getBackupNodes()); + + } + try { + if (channel != null && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) { + channel.send(entry.getBackupNodes(), msg, channelSendOptions); + } + } catch (ChannelException x) { + log.error("Unable to replicate data.", x); + } + } //end if + + } + + /** + * This can be invoked by a periodic thread to replicate out any changes. + * For maps that don't store objects that implement ReplicatedMapEntry, this + * method should be used infrequently to avoid large amounts of data transfer + * @param complete boolean + */ + public void replicate(boolean complete) { + Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<Object, Object> e = i.next(); + replicate(e.getKey(), complete); + } //while + + } + + public void transferState() { + try { + Member[] members = getMapMembers(); + Member backup = members.length > 0 ? (Member)members[0] : null; + if (backup != null) { + MapMessage msg = + new MapMessage(mapContextName, getStateMessageType(), false, null, null, null, null, null); + Response[] resp = + rpcChannel.send(new Member[] {backup}, msg, RpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout); + if (resp.length > 0) { + synchronized (stateMutex) { + msg = (MapMessage)resp[0].getMessage(); + msg.deserialize(getExternalLoaders()); + List list = (List)msg.getValue(); + for (int i = 0; i < list.size(); i++) { + messageReceived((Serializable)list.get(i), resp[0].getSource()); + } //for + } + } else { + log.warn("Transfer state, 0 replies, probably a timeout."); + } + } + } catch (ChannelException x) { + log.error("Unable to transfer LazyReplicatedMap state.", x); + } catch (IOException x) { + log.error("Unable to transfer LazyReplicatedMap state.", x); + } catch (ClassNotFoundException x) { + log.error("Unable to transfer LazyReplicatedMap state.", x); + } + stateTransferred = true; + } + + /** + * @todo implement state transfer + * @param msg Serializable + * @return Serializable - null if no reply should be sent + */ + public Serializable replyRequest(Serializable msg, final Member sender) { + if (!(msg instanceof MapMessage)) + return null; + MapMessage mapmsg = (MapMessage)msg; + + //map init request + if (mapmsg.getMsgType() == MapMessage.MSG_INIT) { + mapmsg.setPrimary(channel.getLocalMember(false)); + return mapmsg; + } + + //map start request + if (mapmsg.getMsgType() == MapMessage.MSG_START) { + mapmsg.setPrimary(channel.getLocalMember(false)); + mapMemberAdded(sender); + return mapmsg; + } + + //backup request + if (mapmsg.getMsgType() == MapMessage.MSG_RETRIEVE_BACKUP) { + MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); + if (entry == null || (!entry.isSerializable())) + return null; + mapmsg.setValue((Serializable)entry.getValue()); + return mapmsg; + } + + //state transfer request + if (mapmsg.getMsgType() == MapMessage.MSG_STATE || mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY) { + synchronized (stateMutex) { //make sure we dont do two things at the same time + ArrayList<MapMessage> list = new ArrayList<MapMessage>(); + Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<Object, Object> e = i.next(); + MapEntry entry = (MapEntry)super.get(e.getKey()); + if (entry != null && entry.isSerializable()) { + boolean copy = (mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY); + MapMessage me = + new MapMessage(mapContextName, copy ? MapMessage.MSG_COPY : MapMessage.MSG_PROXY, false, + (Serializable)entry.getKey(), copy ? (Serializable)entry.getValue() : null, + null, entry.getPrimary(), entry.getBackupNodes()); + list.add(me); + } + } + mapmsg.setValue(list); + return mapmsg; + + } //synchronized + } + + return null; + + } + + /** + * If the reply has already been sent to the requesting thread, + * the rpc callback can handle any data that comes in after the fact. + * @param msg Serializable + * @param sender Member + */ + public void leftOver(Serializable msg, Member sender) { + //left over membership messages + if (!(msg instanceof MapMessage)) + return; + + MapMessage mapmsg = (MapMessage)msg; + try { + mapmsg.deserialize(getExternalLoaders()); + if (mapmsg.getMsgType() == MapMessage.MSG_START) { + mapMemberAdded(mapmsg.getPrimary()); + } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) { + memberAlive(mapmsg.getPrimary()); + } + } catch (IOException x) { + log.error("Unable to deserialize MapMessage.", x); + } catch (ClassNotFoundException x) { + log.error("Unable to deserialize MapMessage.", x); + } + } + + public void messageReceived(Serializable msg, Member sender) { + if (!(msg instanceof MapMessage)) + return; + + MapMessage mapmsg = (MapMessage)msg; + if (log.isTraceEnabled()) { + log.trace("Map[" + mapname + "] received message:" + mapmsg); + } + + try { + mapmsg.deserialize(getExternalLoaders()); + } catch (IOException x) { + log.error("Unable to deserialize MapMessage.", x); + return; + } catch (ClassNotFoundException x) { + log.error("Unable to deserialize MapMessage.", x); + return; + } + if (log.isTraceEnabled()) + log.trace("Map message received from:" + sender.getName() + " msg:" + mapmsg); + if (mapmsg.getMsgType() == MapMessage.MSG_START) { + mapMemberAdded(mapmsg.getPrimary()); + } + + if (mapmsg.getMsgType() == MapMessage.MSG_STOP) { + memberDisappeared(mapmsg.getPrimary()); + } + + if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) { + MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); + if (entry == null) { + entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue()); + entry.setBackup(false); + entry.setProxy(true); + entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); + super.put(entry.getKey(), entry); + } else { + entry.setProxy(true); + entry.setBackup(false); + entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); + } + } + + if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) { + super.remove(mapmsg.getKey()); + } + + if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP || mapmsg.getMsgType() == MapMessage.MSG_COPY) { + MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); + if (entry == null) { + entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue()); + entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); + entry.setProxy(false); + entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); + if (mapmsg.getValue() != null && mapmsg.getValue() instanceof ReplicatedMapEntry) { + ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner()); + } + } else { + entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); + entry.setProxy(false); + entry.setBackupNodes(mapmsg.getBackupNodes()); + entry.setPrimary(mapmsg.getPrimary()); + if (entry.getValue() instanceof ReplicatedMapEntry) { + ReplicatedMapEntry diff = (ReplicatedMapEntry)entry.getValue(); + if (mapmsg.isDiff()) { + try { + diff.lock(); + diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length); + } catch (Exception x) { + log.error("Unable to apply diff to key:" + entry.getKey(), x); + } finally { + diff.unlock(); + } + } else { + if (mapmsg.getValue() != null) + entry.setValue(mapmsg.getValue()); + ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner()); + } //end if + } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) { + ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue(); + re.setOwner(getMapOwner()); + entry.setValue(re); + } else { + if (mapmsg.getValue() != null) + entry.setValue(mapmsg.getValue()); + } //end if + } //end if + super.put(entry.getKey(), entry); + } //end if + } + + public boolean accept(Serializable msg, Member sender) { + boolean result = false; + if (msg instanceof MapMessage) { + if (log.isTraceEnabled()) + log.trace("Map[" + mapname + "] accepting...." + msg); + result = Arrays.equals(mapContextName, ((MapMessage)msg).getMapId()); + if (log.isTraceEnabled()) + log.trace("Msg[" + mapname + "] accepted[" + result + "]...." + msg); + } + return result; + } + + public void mapMemberAdded(Member member) { + Member self = getChannel().getLocalMember(false); + if (member.equals(self)) + return; + boolean memberAdded = false; + //select a backup node if we don't have one + synchronized (mapMembers) { + if (!mapMembers.containsKey(member)) { + mapMembers.put(member, new Long(System.currentTimeMillis())); + memberAdded = true; + } + } + if (memberAdded) { + synchronized (stateMutex) { + Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<Object, Object> e = i.next(); + MapEntry entry = (MapEntry)super.get(e.getKey()); + if (entry == null) + continue; + // if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { + // [rfeng] Change the behavior to replicate to all nodes + if (entry.isPrimary() && self.equals(entry.getPrimary())) { + try { + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); + entry.setBackupNodes(backup); + entry.setPrimary(self); + } catch (ChannelException x) { + log.error("Unable to select backup node.", x); + } //catch + } //end if + } //while + } //synchronized + }//end if + } + + public boolean inSet(Member m, Member[] set) { + if (set == null) + return false; + boolean result = false; + for (int i = 0; i < set.length && (!result); i++) + if (m.equals(set[i])) + result = true; + return result; + } + + public Member[] excludeFromSet(Member[] mbrs, Member[] set) { + List<Member> result = new ArrayList<Member>(); + for (int i = 0; i < set.length; i++) { + boolean include = true; + for (int j = 0; j < mbrs.length; j++) + if (mbrs[j].equals(set[i])) + include = false; + if (include) + result.add(set[i]); + } + return (Member[])result.toArray(new Member[result.size()]); + } + + public void memberAdded(Member member) { + //do nothing + } + + public void memberDisappeared(Member member) { + boolean removed = false; + synchronized (mapMembers) { + removed = (mapMembers.remove(member) != null); + if (!removed) { + if (log.isDebugEnabled()) + log.debug("Member[" + member + "] disappeared, but was not present in the map."); + return; //the member was not part of our map. + } + } + + Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<Object, Object> e = i.next(); + MapEntry entry = (MapEntry)super.get(e.getKey()); + if (entry == null) + continue; + if (entry.isPrimary() && inSet(member, entry.getBackupNodes())) { + if (log.isDebugEnabled()) + log.debug("[1] Primary choosing a new backup"); + try { + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); + entry.setBackupNodes(backup); + entry.setPrimary(channel.getLocalMember(false)); + } catch (ChannelException x) { + log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); + } + } else if (member.equals(entry.getPrimary())) { + if (log.isDebugEnabled()) + log.debug("[2] Primary disappeared"); + entry.setPrimary(null); + } //end if + + if (entry.isProxy() && entry.getPrimary() == null + && entry.getBackupNodes() != null + && entry.getBackupNodes().length == 1 + && entry.getBackupNodes()[0].equals(member)) { + //remove proxies that have no backup nor primaries + if (log.isDebugEnabled()) + log.debug("[3] Removing orphaned proxy"); + i.remove(); + } else if (entry.getPrimary() == null && entry.isBackup() + && entry.getBackupNodes() != null + && entry.getBackupNodes().length == 1 + && entry.getBackupNodes()[0].equals(channel.getLocalMember(false))) { + try { + if (log.isDebugEnabled()) + log.debug("[4] Backup becoming primary"); + entry.setPrimary(channel.getLocalMember(false)); + entry.setBackup(false); + entry.setProxy(false); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); + entry.setBackupNodes(backup); + if (mapOwner != null) + mapOwner.objectMadePrimay(entry.getKey(), entry.getValue()); + + } catch (ChannelException x) { + log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); + } + } + + } //while + } + + public int getNextBackupIndex() { + int size = mapMembers.size(); + if (mapMembers.size() == 0) + return -1; + int node = currentNode++; + if (node >= size) { + node = 0; + currentNode = 0; + } + return node; + } + + public Member getNextBackupNode() { + Member[] members = getMapMembers(); + int node = getNextBackupIndex(); + if (members.length == 0 || node == -1) + return null; + if (node >= members.length) + node = 0; + return members[node]; + } + + protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException; + + public void heartbeat() { + try { + ping(accessTimeout); + } catch (Exception x) { + log.error("Unable to send AbstractReplicatedMap.ping message", x); + } + } + + //------------------------------------------------------------------------------ + // METHODS TO OVERRIDE + //------------------------------------------------------------------------------ + + /** + * Removes an object from this map, it will also remove it from + * + * @param key Object + * @return Object + */ + public Object remove(Object key) { + return remove(key, true); + } + + public Object remove(Object key, boolean notify) { + MapEntry entry = (MapEntry)super.remove(key); + + try { + if (getMapMembers().length > 0 && notify) { + MapMessage msg = + new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable)key, null, null, + null, null); + getChannel().send(getMapMembers(), msg, getChannelSendOptions()); + } + } catch (ChannelException x) { + log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation", x); + } + return entry != null ? entry.getValue() : null; + } + + public MapEntry getInternal(Object key) { + return (MapEntry)super.get(key); + } + + public Object get(Object key) { + MapEntry entry = (MapEntry)super.get(key); + if (log.isTraceEnabled()) + log.trace("Requesting id:" + key + " entry:" + entry); + if (entry == null) + return null; + if (!entry.isPrimary()) { + //if the message is not primary, we need to retrieve the latest value + try { + Member[] backup = null; + MapMessage msg = null; + if (!entry.isBackup()) { + //make sure we don't retrieve from ourselves + msg = + new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable)key, + null, null, null, null); + Response[] resp = + getRpcChannel().send(entry.getBackupNodes(), + msg, + RpcChannel.FIRST_REPLY, + Channel.SEND_OPTIONS_DEFAULT, + getRpcTimeout()); + if (resp == null || resp.length == 0) { + //no responses + log.warn("Unable to retrieve remote object for key:" + key); + return null; + } + msg = (MapMessage)resp[0].getMessage(); + msg.deserialize(getExternalLoaders()); + backup = entry.getBackupNodes(); + if (entry.getValue() instanceof ReplicatedMapEntry) { + ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); + val.setOwner(getMapOwner()); + } + if (msg.getValue() != null) + entry.setValue(msg.getValue()); + } + if (entry.isBackup()) { + //select a new backup node + backup = publishEntryInfo(key, entry.getValue()); + } else if (entry.isProxy()) { + //invalidate the previous primary + msg = + new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable)key, null, null, + channel.getLocalMember(false), backup); + Member[] dest = getMapMembersExcl(backup); + if (dest != null && dest.length > 0) { + getChannel().send(dest, msg, getChannelSendOptions()); + } + } + entry.setPrimary(channel.getLocalMember(false)); + entry.setBackupNodes(backup); + entry.setBackup(false); + entry.setProxy(false); + + } catch (Exception x) { + log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x); + return null; + } + } + if (log.isTraceEnabled()) + log.trace("Requesting id:" + key + " result:" + entry.getValue()); + if (entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry) { + ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); + //hack, somehow this is not being set above + val.setOwner(getMapOwner()); + + } + return entry.getValue(); + } + + protected void printMap(String header) { + try { + System.out.println("\nDEBUG MAP:" + header); + System.out.println("Map[" + new String(mapContextName, chset) + ", Map Size:" + super.size()); + Member[] mbrs = getMapMembers(); + for (int i = 0; i < mbrs.length; i++) { + System.out.println("Mbr[" + (i + 1) + "=" + mbrs[i].getName()); + } + Iterator i = super.entrySet().iterator(); + int cnt = 0; + + while (i.hasNext()) { + Map.Entry e = (Map.Entry)i.next(); + System.out.println((++cnt) + ". " + super.get(e.getKey())); + } + System.out.println("EndMap]\n\n"); + } catch (Exception ignore) { + ignore.printStackTrace(); + } + } + + /** + * Returns true if the key has an entry in the map. + * The entry can be a proxy or a backup entry, invoking <code>get(key)</code> + * will make this entry primary for the group + * @param key Object + * @return boolean + */ + public boolean containsKey(Object key) { + return super.containsKey(key); + } + + public Object put(Object key, Object value) { + return put(key, value, true); + } + + public Object put(Object key, Object value, boolean notify) { + MapEntry entry = new MapEntry(key, value); + entry.setBackup(false); + entry.setProxy(false); + entry.setPrimary(channel.getLocalMember(false)); + + Object old = null; + + //make sure that any old values get removed + if (containsKey(key)) + old = remove(key); + try { + if (notify) { + Member[] backup = publishEntryInfo(key, value); + entry.setBackupNodes(backup); + } + } catch (ChannelException x) { + log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x); + } + super.put(key, entry); + return old; + } + + /** + * Copies all values from one map to this instance + * @param m Map + */ + public void putAll(Map m) { + Iterator i = m.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry entry = (Map.Entry)i.next(); + put(entry.getKey(), entry.getValue()); + } + } + + public void clear() { + clear(true); + } + + public void clear(boolean notify) { + if (notify) { + //only delete active keys + Iterator<Object> keys = keySet().iterator(); + while (keys.hasNext()) + remove(keys.next()); + } else { + super.clear(); + } + } + + public boolean containsValue(Object value) { + if (value == null) { + return super.containsValue(value); + } else { + Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<Object, Object> e = i.next(); + MapEntry entry = (MapEntry)super.get(e.getKey()); + if (entry != null && entry.isPrimary() && value.equals(entry.getValue())) + return true; + }//while + return false; + }//end if + } + + public Object clone() { + throw new UnsupportedOperationException("This operation is not valid on a replicated map"); + } + + /** + * Returns the entire contents of the map + * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information + * about the object. + * @return Set + */ + public Set<Map.Entry<Object, Object>> entrySetFull() { + return super.entrySet(); + } + + public Set<Object> keySetFull() { + return super.keySet(); + } + + public int sizeFull() { + return super.size(); + } + + public Set<Map.Entry<Object, Object>> entrySet() { + Set<Map.Entry<Object, Object>> set = new LinkedHashSet<Map.Entry<Object, Object>>(super.size()); + Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<Object, Object> e = i.next(); + Object key = e.getKey(); + MapEntry entry = (MapEntry)super.get(key); + if (entry != null && entry.isPrimary()) { + set.add(new MapEntry(key, entry.getValue())); + } + } + return Collections.unmodifiableSet(set); + } + + public Set<Object> keySet() { + //todo implement + //should only return keys where this is active. + LinkedHashSet<Object> set = new LinkedHashSet<Object>(super.size()); + Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<Object, Object> e = i.next(); + Object key = e.getKey(); + MapEntry entry = (MapEntry)super.get(key); + if (entry != null && entry.isPrimary()) + set.add(key); + } + return Collections.unmodifiableSet(set); + + } + + public int size() { + //todo, implement a counter variable instead + //only count active members in this node + int counter = 0; + Iterator<Map.Entry<Object, Object>> it = super.entrySet().iterator(); + while (it != null && it.hasNext()) { + Map.Entry<Object, Object> e = it.next(); + if (e != null) { + MapEntry entry = (MapEntry)super.get(e.getKey()); + if (entry != null && entry.isPrimary() && entry.getValue() != null) + counter++; + } + } + return counter; + } + + protected boolean removeEldestEntry(Map.Entry<Object, Object> eldest) { + return false; + } + + public boolean isEmpty() { + return size() == 0; + } + + public Collection<Object> values() { + ArrayList<Object> values = new ArrayList<Object>(); + Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); + while (i.hasNext()) { + Map.Entry<Object, Object> e = i.next(); + MapEntry entry = (MapEntry)super.get(e.getKey()); + if (entry != null && entry.isPrimary() && entry.getValue() != null) + values.add(entry.getValue()); + } + return Collections.unmodifiableCollection(values); + } + + //------------------------------------------------------------------------------ + // Map Entry class + //------------------------------------------------------------------------------ + public static class MapEntry implements Map.Entry<Object, Object> { + private boolean backup; + private boolean proxy; + private Member[] backupNodes; + private Member primary; + private Object key; + private Object value; + + public MapEntry(Object key, Object value) { + setKey(key); + setValue(value); + + } + + public boolean isKeySerializable() { + return (key == null) || (key instanceof Serializable); + } + + public boolean isValueSerializable() { + return (value == null) || (value instanceof Serializable); + } + + public boolean isSerializable() { + return isKeySerializable() && isValueSerializable(); + } + + public boolean isBackup() { + return backup; + } + + public void setBackup(boolean backup) { + this.backup = backup; + } + + public boolean isProxy() { + return proxy; + } + + public boolean isPrimary() { + return ((!proxy) && (!backup)); + } + + public void setProxy(boolean proxy) { + this.proxy = proxy; + } + + public boolean isDiffable() { + return (value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable(); + } + + public void setBackupNodes(Member[] nodes) { + this.backupNodes = nodes; + } + + public Member[] getBackupNodes() { + return backupNodes; + } + + public void setPrimary(Member m) { + primary = m; + } + + public Member getPrimary() { + return primary; + } + + public Object getValue() { + return value; + } + + public Object setValue(Object value) { + Object old = this.value; + this.value = value; + return old; + } + + public Object getKey() { + return key; + } + + public Object setKey(Object key) { + Object old = this.key; + this.key = key; + return old; + } + + public int hashCode() { + return key.hashCode(); + } + + public boolean equals(Object o) { + return key.equals(o); + } + + /** + * apply a diff, or an entire object + * @param data byte[] + * @param offset int + * @param length int + * @param diff boolean + * @throws IOException + * @throws ClassNotFoundException + */ + public void apply(byte[] data, int offset, int length, boolean diff) throws IOException, ClassNotFoundException { + if (isDiffable() && diff) { + ReplicatedMapEntry rentry = (ReplicatedMapEntry)value; + try { + rentry.lock(); + rentry.applyDiff(data, offset, length); + } finally { + rentry.unlock(); + } + } else if (length == 0) { + value = null; + proxy = true; + } else { + value = XByteBuffer.deserialize(data, offset, length); + } + } + + public String toString() { + StringBuffer buf = new StringBuffer("MapEntry[key:"); + buf.append(getKey()).append("; "); + buf.append("value:").append(getValue()).append("; "); + buf.append("primary:").append(isPrimary()).append("; "); + buf.append("backup:").append(isBackup()).append("; "); + buf.append("proxy:").append(isProxy()).append(";]"); + return buf.toString(); + } + + } + + //------------------------------------------------------------------------------ + // map message to send to and from other maps + //------------------------------------------------------------------------------ + + public static class MapMessage implements Serializable { + private static final long serialVersionUID = -7847288807489375686L; + public static final int MSG_BACKUP = 1; + public static final int MSG_RETRIEVE_BACKUP = 2; + public static final int MSG_PROXY = 3; + public static final int MSG_REMOVE = 4; + public static final int MSG_STATE = 5; + public static final int MSG_START = 6; + public static final int MSG_STOP = 7; + public static final int MSG_INIT = 8; + public static final int MSG_COPY = 9; + public static final int MSG_STATE_COPY = 10; + + private byte[] mapId; + private int msgtype; + private boolean diff; + private transient Serializable key; + private transient Serializable value; + private byte[] valuedata; + private byte[] keydata; + private byte[] diffvalue; + private Member[] nodes; + private Member primary; + + public String toString() { + StringBuffer buf = new StringBuffer("MapMessage[context="); + buf.append(new String(mapId)); + buf.append("; type="); + buf.append(getTypeDesc()); + buf.append("; key="); + buf.append(key); + buf.append("; value="); + buf.append(value); + return buf.toString(); + } + + public String getTypeDesc() { + switch (msgtype) { + case MSG_BACKUP: + return "MSG_BACKUP"; + case MSG_RETRIEVE_BACKUP: + return "MSG_RETRIEVE_BACKUP"; + case MSG_PROXY: + return "MSG_PROXY"; + case MSG_REMOVE: + return "MSG_REMOVE"; + case MSG_STATE: + return "MSG_STATE"; + case MSG_START: + return "MSG_START"; + case MSG_STOP: + return "MSG_STOP"; + case MSG_INIT: + return "MSG_INIT"; + case MSG_STATE_COPY: + return "MSG_STATE_COPY"; + case MSG_COPY: + return "MSG_COPY"; + default: + return "UNKNOWN"; + } + } + + public MapMessage() { + } + + public MapMessage(byte[] mapId, + int msgtype, + boolean diff, + Serializable key, + Serializable value, + byte[] diffvalue, + Member primary, + Member[] nodes) { + this.mapId = mapId; + this.msgtype = msgtype; + this.diff = diff; + this.key = key; + this.value = value; + this.diffvalue = diffvalue; + this.nodes = nodes; + this.primary = primary; + setValue(value); + setKey(key); + } + + public void deserialize(ClassLoader[] cls) throws IOException, ClassNotFoundException { + key(cls); + value(cls); + } + + public int getMsgType() { + return msgtype; + } + + public boolean isDiff() { + return diff; + } + + public Serializable getKey() { + try { + return key(null); + } catch (Exception x) { + log.error("Deserialization error of the MapMessage.key", x); + return null; + } + } + + public Serializable key(ClassLoader[] cls) throws IOException, ClassNotFoundException { + if (key != null) + return key; + if (keydata == null || keydata.length == 0) + return null; + key = XByteBuffer.deserialize(keydata, 0, keydata.length, cls); + keydata = null; + return key; + } + + public byte[] getKeyData() { + return keydata; + } + + public Serializable getValue() { + try { + return value(null); + } catch (Exception x) { + log.error("Deserialization error of the MapMessage.value", x); + return null; + } + } + + public Serializable value(ClassLoader[] cls) throws IOException, ClassNotFoundException { + if (value != null) + return value; + if (valuedata == null || valuedata.length == 0) + return null; + value = XByteBuffer.deserialize(valuedata, 0, valuedata.length, cls); + valuedata = null; + ; + return value; + } + + public byte[] getValueData() { + return valuedata; + } + + public byte[] getDiffValue() { + return diffvalue; + } + + public Member[] getBackupNodes() { + return nodes; + } + + private void setBackUpNodes(Member[] nodes) { + this.nodes = nodes; + } + + public Member getPrimary() { + return primary; + } + + private void setPrimary(Member m) { + primary = m; + } + + public byte[] getMapId() { + return mapId; + } + + public void setValue(Serializable value) { + try { + if (value != null) + valuedata = XByteBuffer.serialize(value); + this.value = value; + } catch (IOException x) { + throw new RuntimeException(x); + } + } + + public void setKey(Serializable key) { + try { + if (key != null) + keydata = XByteBuffer.serialize(key); + this.key = key; + } catch (IOException x) { + throw new RuntimeException(x); + } + } + + protected Member[] readMembers(ObjectInput in) throws IOException, ClassNotFoundException { + int nodecount = in.readInt(); + Member[] members = new Member[nodecount]; + for (int i = 0; i < members.length; i++) { + byte[] d = new byte[in.readInt()]; + in.read(d); + if (d.length > 0) + members[i] = MemberImpl.getMember(d); + } + return members; + } + + protected void writeMembers(ObjectOutput out, Member[] members) throws IOException { + if (members == null) + members = new Member[0]; + out.writeInt(members.length); + for (int i = 0; i < members.length; i++) { + if (members[i] != null) { + byte[] d = members[i] != null ? ((MemberImpl)members[i]).getData(false) : new byte[0]; + out.writeInt(d.length); + out.write(d); + } + } + } + + /** + * shallow clone + * @return Object + */ + public Object clone() { + MapMessage msg = + new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.primary, + this.nodes); + msg.keydata = this.keydata; + msg.valuedata = this.valuedata; + return msg; + } + } //MapMessage + + public Channel getChannel() { + return channel; + } + + public byte[] getMapContextName() { + return mapContextName; + } + + public RpcChannel getRpcChannel() { + return rpcChannel; + } + + public long getRpcTimeout() { + return rpcTimeout; + } + + public Object getStateMutex() { + return stateMutex; + } + + public boolean isStateTransferred() { + return stateTransferred; + } + + public MapOwner getMapOwner() { + return mapOwner; + } + + public ClassLoader[] getExternalLoaders() { + return externalLoaders; + } + + public int getChannelSendOptions() { + return channelSendOptions; + } + + public long getAccessTimeout() { + return accessTimeout; + } + + public void setMapOwner(MapOwner mapOwner) { + this.mapOwner = mapOwner; + } + + public void setExternalLoaders(ClassLoader[] externalLoaders) { + this.externalLoaders = externalLoaders; + } + + public void setChannelSendOptions(int channelSendOptions) { + this.channelSendOptions = channelSendOptions; + } + + public void setAccessTimeout(long accessTimeout) { + this.accessTimeout = accessTimeout; + } + +} diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java new file mode 100644 index 0000000000..1e4d3af50d --- /dev/null +++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.tribes; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.group.ChannelInterceptorBase; + +/** + * A static interceptor to disables multicast. + * Can be removed when/if the function gets added to Tribes. + * See Tomcat email http://markmail.org/message/doqu7pfl2hvvdfcl + */ +public class DisableMcastInterceptor extends ChannelInterceptorBase { + + public DisableMcastInterceptor() { + super(); + } + + public void start(int svc) throws ChannelException { + svc = (svc & (~Channel.MBR_TX_SEQ)); + super.start(svc); + } +} diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java new file mode 100644 index 0000000000..9e540743bf --- /dev/null +++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.tribes; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * The Map that can fire events on put/remove of entries + */ +public abstract class MapStore extends ConcurrentHashMap<Object, Object> { + private static final long serialVersionUID = -2127235547082144368L; + private List<MapListener> listeners = new CopyOnWriteArrayList<MapListener>(); + + protected MapStore(int initialCapacity, float loadFactor, int concurrencyLevel) { + super(initialCapacity, loadFactor, concurrencyLevel); + } + + @Override + public Object put(Object key, Object value) { + Object old = super.put(key, value); + if (old != null) { + for (MapListener listener : listeners) { + listener.entryUpdated(key, old, value); + } + } else { + for (MapListener listener : listeners) { + listener.entryAdded(key, value); + } + + } + return old; + } + + @Override + public Object remove(Object key) { + Object old = super.remove(key); + if (old != null) { + for (MapListener listener : listeners) { + listener.entryRemoved(key, old); + } + } + return old; + } + + public void addListener(MapListener listener) { + listeners.add(listener); + } + + public List<MapListener> getListeners() { + return listeners; + } + + public boolean removeListener(MapListener listener) { + return listeners.remove(listener); + } + + public static interface MapListener { + void entryAdded(Object key, Object value); + + void entryUpdated(Object key, Object oldValue, Object newValue); + + void entryRemoved(Object key, Object value); + } +} diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java new file mode 100644 index 0000000000..5dc627d88d --- /dev/null +++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.tribes; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.URI; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelReceiver; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.group.GroupChannel; +import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor; +import org.apache.catalina.tribes.membership.McastService; +import org.apache.catalina.tribes.membership.StaticMember; +import org.apache.catalina.tribes.transport.ReceiverBase; +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.endpoint.tribes.AbstractReplicatedMap.MapEntry; +import org.apache.tuscany.sca.endpoint.tribes.MapStore.MapListener; +import org.apache.tuscany.sca.runtime.BaseEndpointRegistry; +import org.apache.tuscany.sca.runtime.DomainRegistryURI; +import org.apache.tuscany.sca.runtime.EndpointRegistry; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; + +/** + * A replicated EndpointRegistry based on Apache Tomcat Tribes + */ +public class ReplicatedEndpointRegistry extends BaseEndpointRegistry implements EndpointRegistry, LifeCycleListener, + MapListener { + private final static Logger logger = Logger.getLogger(ReplicatedEndpointRegistry.class.getName()); + private static final String MULTICAST_ADDRESS = "228.0.0.100"; + private static final int MULTICAST_PORT = 50000; + + private static final int FIND_REPEAT_COUNT = 10; + + private int port = MULTICAST_PORT; + private String address = MULTICAST_ADDRESS; + private String bind = null; + private int timeout = 50; + private String receiverAddress; + private int receiverPort = 4000; + private int receiverAutoBind = 100; + private List<URI> staticRoutes; + + private ReplicatedMap map; + + private String id; + private boolean noMultiCast; + + private static final GroupChannel createChannel(String address, int port, String bindAddress) { + + //create a channel + GroupChannel channel = new GroupChannel(); + McastService mcastService = (McastService)channel.getMembershipService(); + mcastService.setPort(port); + mcastService.setAddress(address); + + // REVIEW: In my case, there are multiple IP addresses + // One for the WIFI and the other one for VPN. For some reason the VPN one doesn't support + // Multicast + + if (bindAddress != null) { + mcastService.setBind(bindAddress); + } else { + mcastService.setBind(getBindAddress()); + } + + return channel; + } + + public ReplicatedEndpointRegistry(ExtensionPointRegistry registry, + Map<String, String> attributes, + String domainRegistryURI, + String domainURI) { + super(registry, attributes, domainRegistryURI, domainURI); + getParameters(attributes, domainRegistryURI); + } + + private Map<String, String> getParameters(Map<String, String> attributes, String domainRegistryURI) { + Map<String, String> map = new HashMap<String, String>(); + if (attributes != null) { + map.putAll(attributes); + } + URI uri = URI.create(domainRegistryURI); + if (uri.getHost() != null) { + map.put("address", uri.getHost()); + } + if (uri.getPort() != -1) { + map.put("port", String.valueOf(uri.getPort())); + } + + if (domainRegistryURI.startsWith("tuscany")) { + setTuscanyConfig(map, domainRegistryURI); + setConfig(map); + return map; + } + + int index = domainRegistryURI.indexOf('?'); + if (index == -1) { + setConfig(map); + return map; + } + String query = domainRegistryURI.substring(index + 1); + try { + query = URLDecoder.decode(query, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new IllegalArgumentException(e); + } + String[] params = query.split("&"); + for (String param : params) { + index = param.indexOf('='); + if (index != -1) { + map.put(param.substring(0, index), param.substring(index + 1)); + } + } + setConfig(map); + return map; + } + + private void setTuscanyConfig(Map<String, String> map, String domainRegistryURI) { + DomainRegistryURI tuscanyURI = new DomainRegistryURI(domainRegistryURI); + map.put("address", tuscanyURI.getMulticastAddress()); + map.put("port", Integer.toString(tuscanyURI.getMulticastPort())); + map.put("bind", tuscanyURI.getBindAddress()); + map.put("receiverPort", Integer.toString(tuscanyURI.getListenPort())); + if (tuscanyURI.isMulticastDisabled()) { + map.put("nomcast", "true"); + } + if (tuscanyURI.getRemotes().size() > 0) { + String routes = ""; + for (int i=0; i<tuscanyURI.getRemotes().size(); i++) { + routes += tuscanyURI.getRemotes().get(i); + if (i < tuscanyURI.getRemotes().size()) { + routes += ","; + } + } + map.put("routes", routes); + } + } + + private void setConfig(Map<String, String> attributes) { + String portStr = attributes.get("port"); + if (portStr != null) { + port = Integer.parseInt(portStr); + if (port == -1) { + port = MULTICAST_PORT; + } + } + String address = attributes.get("address"); + if (address == null) { + address = MULTICAST_ADDRESS; + } + bind = attributes.get("bind"); + String timeoutStr = attributes.get("timeout"); + if (timeoutStr != null) { + timeout = Integer.parseInt(timeoutStr); + } + + String routesStr = attributes.get("routes"); + if (routesStr != null) { + StringTokenizer st = new StringTokenizer(routesStr); + staticRoutes = new ArrayList<URI>(); + while (st.hasMoreElements()) { + staticRoutes.add(URI.create("tcp://" + st.nextToken())); + } + } + String mcast = attributes.get("nomcast"); + if (mcast != null) { + noMultiCast = Boolean.valueOf(mcast); + } + receiverAddress = attributes.get("receiverAddress"); + String recvPort = attributes.get("receiverPort"); + if (recvPort != null) { + receiverPort = Integer.parseInt(recvPort); + } + String recvAutoBind = attributes.get("receiverAutoBind"); + if (recvAutoBind != null) { + receiverAutoBind = Integer.parseInt(recvAutoBind); + } + } + + public void start() { + if (map != null) { + throw new IllegalStateException("The registry has already been started"); + } + GroupChannel channel = createChannel(address, port, bind); + map = + new ReplicatedMap(null, channel, timeout, this.domainURI, + new ClassLoader[] {ReplicatedEndpointRegistry.class.getClassLoader()}); + map.addListener(this); + + if (noMultiCast) { + map.getChannel().addInterceptor(new DisableMcastInterceptor()); + } + + // Configure the receiver ports + ChannelReceiver receiver = channel.getChannelReceiver(); + if (receiver instanceof ReceiverBase) { + if (receiverAddress != null) { + ((ReceiverBase)receiver).setAddress(receiverAddress); + } + ((ReceiverBase)receiver).setPort(receiverPort); + ((ReceiverBase)receiver).setAutoBind(receiverAutoBind); + } + + /* + Object sender = channel.getChannelSender(); + if (sender instanceof ReplicationTransmitter) { + sender = ((ReplicationTransmitter)sender).getTransport(); + } + if (sender instanceof AbstractSender) { + ((AbstractSender)sender).setKeepAliveCount(0); + ((AbstractSender)sender).setMaxRetryAttempts(5); + } + */ + + if (staticRoutes != null) { + StaticMembershipInterceptor smi = new StaticMembershipInterceptor(); + for (URI staticRoute : staticRoutes) { + Member member; + try { + // The port has to match the receiver port + member = new StaticMember(staticRoute.getHost(), staticRoute.getPort(), 5000); + } catch (IOException e) { + throw new RuntimeException(e); + } + smi.addStaticMember(member); + logger.info("Added static route: " + staticRoute.getHost() + ":" + staticRoute.getPort()); + } + smi.setLocalMember(map.getChannel().getLocalMember(false)); + map.getChannel().addInterceptor(smi); + } + + try { + map.getChannel().start(Channel.DEFAULT); + } catch (ChannelException e) { + throw new IllegalStateException(e); + } + + } + + public void stop() { + if (map != null) { + map.removeListener(this); + Channel channel = map.getChannel(); + map.breakdown(); + try { + channel.stop(Channel.DEFAULT); + } catch (ChannelException e) { + logger.log(Level.WARNING, e.getMessage(), e); + } + map = null; + } + } + + public void addEndpoint(Endpoint endpoint) { + map.put(endpoint.getURI(), endpoint); + logger.info("Add endpoint - " + endpoint); + } + + public List<Endpoint> findEndpoint(String uri) { + List<Endpoint> foundEndpoints = new ArrayList<Endpoint>(); + + // in the failure case we repeat the look up after a short + // delay to take account of tribes replication delays + int repeat = FIND_REPEAT_COUNT; + + while (repeat > 0) { + for (Object v : map.values()) { + Endpoint endpoint = (Endpoint)v; + // TODO: implement more complete matching + logger.fine("Matching against - " + endpoint); + if (endpoint.matches(uri)) { + MapEntry entry = map.getInternal(endpoint.getURI()); + // if (!entry.isPrimary()) { + ((RuntimeEndpoint)endpoint).bind(registry, this); + // } + foundEndpoints.add(endpoint); + logger.fine("Found endpoint with matching service - " + endpoint); + repeat = 0; + } + // else the service name doesn't match + } + + if (foundEndpoints.size() == 0) { + // the service name doesn't match any endpoints so wait a little and try + // again in case this is caused by tribes synch delays + logger.info("Repeating endpoint reference match - " + uri); + repeat--; + try { + Thread.sleep(1000); + } catch (Exception ex) { + // do nothing + repeat = 0; + } + } + } + + return foundEndpoints; + } + + private boolean isLocal(MapEntry entry) { + return entry.getPrimary().equals(map.getChannel().getLocalMember(false)); + } + + public Endpoint getEndpoint(String uri) { + return (Endpoint)map.get(uri); + } + + public List<Endpoint> getEndpoints() { + return new ArrayList(map.values()); + } + + public void removeEndpoint(Endpoint endpoint) { + map.remove(endpoint.getURI()); + logger.info("Remove endpoint - " + endpoint); + } + + public void replicate(boolean complete) { + map.replicate(complete); + } + + public void updateEndpoint(String uri, Endpoint endpoint) { + Endpoint oldEndpoint = getEndpoint(uri); + if (oldEndpoint == null) { + throw new IllegalArgumentException("Endpoint is not found: " + uri); + } + map.put(endpoint.getURI(), endpoint); + } + + public void entryAdded(Object key, Object value) { + MapEntry entry = (MapEntry)value; + Endpoint newEp = (Endpoint)entry.getValue(); + if (!isLocal(entry)) { + logger.info(id + " Remote endpoint added: " + entry.getValue()); + } + endpointAdded(newEp); + } + + public void entryRemoved(Object key, Object value) { + MapEntry entry = (MapEntry)value; + if (!isLocal(entry)) { + logger.info(id + " Remote endpoint removed: " + entry.getValue()); + } + endpointRemoved((Endpoint)entry.getValue()); + } + + public void entryUpdated(Object key, Object oldValue, Object newValue) { + MapEntry oldEntry = (MapEntry)oldValue; + MapEntry newEntry = (MapEntry)newValue; + if (!isLocal(newEntry)) { + logger.info(id + " Remote endpoint updated: " + newEntry.getValue()); + } + Endpoint oldEp = (Endpoint)oldEntry.getValue(); + Endpoint newEp = (Endpoint)newEntry.getValue(); + endpointUpdated(oldEp, newEp); + } + + private static String getBindAddress() { + try { + Enumeration<NetworkInterface> nis = NetworkInterface.getNetworkInterfaces(); + while (nis.hasMoreElements()) { + NetworkInterface ni = nis.nextElement(); + // The following APIs require JDK 1.6 + /* + if (ni.isLoopback() || !ni.isUp() || !ni.supportsMulticast()) { + continue; + } + */ + Enumeration<InetAddress> ips = ni.getInetAddresses(); + if (!ips.hasMoreElements()) { + continue; + } + while (ips.hasMoreElements()) { + InetAddress addr = ips.nextElement(); + if (addr.isLoopbackAddress()) { + continue; + } + return addr.getHostAddress(); + } + } + return InetAddress.getLocalHost().getHostAddress(); + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + return null; + } + } + +} diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java new file mode 100644 index 0000000000..669ad82192 --- /dev/null +++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tuscany.sca.endpoint.tribes; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.Map; + +import org.apache.catalina.tribes.Channel; +import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.MembershipListener; +import org.apache.catalina.tribes.group.RpcCallback; + +/** + * This file is copied from: + * https://svn.apache.org/repos/asf/tomcat/tc6.0.x/tags/TOMCAT_6_0_20/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java + * + */ +public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, ChannelListener, MembershipListener { + private static final long serialVersionUID = -6318779627600581121L; + protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ReplicatedMap.class); + + //------------------------------------------------------------------------------ + // CONSTRUCTORS / DESTRUCTORS + //------------------------------------------------------------------------------ + /** + * Creates a new map + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messags + * @param mapContextName String - unique name for this map, to allow multiple maps per channel + * @param initialCapacity int - the size of this map, see HashMap + * @param loadFactor float - load factor, see HashMap + */ + public ReplicatedMap(MapOwner owner, + Channel channel, + long timeout, + String mapContextName, + int initialCapacity, + float loadFactor, + ClassLoader[] cls) { + super(owner, channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls); + } + + /** + * Creates a new map + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messags + * @param mapContextName String - unique name for this map, to allow multiple maps per channel + * @param initialCapacity int - the size of this map, see HashMap + */ + public ReplicatedMap(MapOwner owner, + Channel channel, + long timeout, + String mapContextName, + int initialCapacity, + ClassLoader[] cls) { + super(owner, channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, + Channel.SEND_OPTIONS_DEFAULT, cls); + } + + /** + * Creates a new map + * @param channel The channel to use for communication + * @param timeout long - timeout for RPC messags + * @param mapContextName String - unique name for this map, to allow multiple maps per channel + */ + public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) { + super(owner, channel, timeout, mapContextName, AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, + AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls); + } + + //------------------------------------------------------------------------------ + // METHODS TO OVERRIDE + //------------------------------------------------------------------------------ + protected int getStateMessageType() { + return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY; + } + + /** + * publish info about a map pair (key/value) to other nodes in the cluster + * @param key Object + * @param value Object + * @return Member - the backup node + * @throws ChannelException + */ + protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { + if (!(key instanceof Serializable && value instanceof Serializable)) + return new Member[0]; + //select a backup node + Member[] members = getMapMembers(); + + if (members == null || members.length == 0) { + return new Member[0]; + } + + //publish the data out to all nodes + MapMessage msg = + new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value, + null, channel.getLocalMember(false), members); + + getChannel().send(members, msg, getChannelSendOptions()); + + return members; + } + + /** + * Override the base method to look up existing entries only + */ + public Object get(Object key) { + MapEntry entry = super.getInternal(key); + if (log.isTraceEnabled()) + log.trace("Requesting id:" + key + " entry:" + entry); + if (entry == null) { + return null; + } + return entry.getValue(); + } + + /** + * Override the base method to remove all entries owned by the member that disappeared + */ + public void memberDisappeared(Member member) { + boolean removed = false; + synchronized (mapMembers) { + removed = (mapMembers.remove(member) != null); + if (!removed) { + if (log.isDebugEnabled()) + log.debug("Member[" + member + "] disappeared, but was not present in the map."); + return; //the member was not part of our map. + } + } + + Iterator<Map.Entry<Object, Object>> i = super.entrySetFull().iterator(); + while (i.hasNext()) { + Map.Entry<Object, Object> e = i.next(); + MapEntry entry = (MapEntry)super.getInternal(e.getKey()); + if (entry == null) { + continue; + } + if (member.equals(entry.getPrimary())) { + if (log.isDebugEnabled()) + log.debug("[2] Primary disappeared"); + i.remove(); + } //end if + } //while + } +} diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java new file mode 100644 index 0000000000..91314b4c37 --- /dev/null +++ b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.tribes; + +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.runtime.BaseDomainRegistryFactory; +import org.apache.tuscany.sca.runtime.EndpointRegistry; + +/** + * The utility responsible for finding the endpoint regstry by the scheme and creating instances for the + * given domain + */ +public class TribesDomainRegistryFactory extends BaseDomainRegistryFactory { + private final static String[] schemes = new String[] {"multicast", "tribes", "tuscany"}; + + /** + * @param extensionRegistry + */ + public TribesDomainRegistryFactory(ExtensionPointRegistry registry) { + super(registry); + } + + protected EndpointRegistry createEndpointRegistry(String endpointRegistryURI, String domainURI) { + EndpointRegistry endpointRegistry = + new ReplicatedEndpointRegistry(registry, null, endpointRegistryURI, domainURI); + return endpointRegistry; + } + + public String[] getSupportedSchemes() { + return schemes; + } +} |