From fb3dd9a9095abc40027318b1712d0184f6d60e73 Mon Sep 17 00:00:00 2001 From: antelder Date: Fri, 30 Oct 2009 08:00:47 +0000 Subject: Delete old M4 branch and start again to pick up all the recent trunk changes git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@831223 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/endpoint/tribes/AbstractReplicatedMap.java | 1564 -------------------- .../tuscany/sca/endpoint/tribes/MapStore.java | 83 -- .../tribes/ReplicatedEndpointRegistry.java | 451 ------ .../tuscany/sca/endpoint/tribes/ReplicatedMap.java | 125 -- 4 files changed, 2223 deletions(-) delete mode 100644 branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java delete mode 100644 branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java delete mode 100644 branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java delete mode 100644 branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java (limited to 'branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java') diff --git a/branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java deleted file mode 100644 index 98c6739854..0000000000 --- a/branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java +++ /dev/null @@ -1,1564 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tuscany.sca.endpoint.tribes; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.io.Serializable; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.catalina.tribes.Channel; -import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.ChannelListener; -import org.apache.catalina.tribes.Heartbeat; -import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.MembershipListener; -import org.apache.catalina.tribes.group.Response; -import org.apache.catalina.tribes.group.RpcCallback; -import org.apache.catalina.tribes.group.RpcChannel; -import org.apache.catalina.tribes.io.XByteBuffer; -import org.apache.catalina.tribes.membership.MemberImpl; -import org.apache.catalina.tribes.tipis.ReplicatedMapEntry; -import org.apache.catalina.tribes.util.Arrays; -import org.apache.juli.logging.Log; -import org.apache.juli.logging.LogFactory; - -/** - * This file is copied from: - * https://svn.apache.org/repos/asf/tomcat/tc6.0.x/tags/TOMCAT_6_0_20/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java - * - * We need to intercept the put/remove calls on the base Map to fire events - */ -public abstract class AbstractReplicatedMap extends MapStore implements RpcCallback, ChannelListener, - MembershipListener, Heartbeat { - private static final long serialVersionUID = -2703358020113899074L; - - protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class); - - /** - * The default initial capacity - MUST be a power of two. - */ - public static final int DEFAULT_INITIAL_CAPACITY = 16; - - /** - * The load factor used when none specified in constructor. - **/ - public static final float DEFAULT_LOAD_FACTOR = 0.75f; - - /** - * Used to identify the map - */ - final String chset = "ISO-8859-1"; - - //------------------------------------------------------------------------------ - // INSTANCE VARIABLES - //------------------------------------------------------------------------------ - protected abstract int getStateMessageType(); - - /** - * Timeout for RPC messages, how long we will wait for a reply - */ - protected transient long rpcTimeout = 5000; - /** - * Reference to the channel for sending messages - */ - protected transient Channel channel; - /** - * The RpcChannel to send RPC messages through - */ - protected transient RpcChannel rpcChannel; - /** - * The Map context name makes this map unique, this - * allows us to have more than one map shared - * through one channel - */ - protected transient byte[] mapContextName; - /** - * Has the state been transferred - */ - protected transient boolean stateTransferred = false; - /** - * Simple lock object for transfers - */ - protected transient Object stateMutex = new Object(); - /** - * A list of members in our map - */ - protected transient Map mapMembers = new HashMap(); - /** - * Our default send options - */ - protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT; - /** - * The owner of this map, ala a SessionManager for example - */ - protected transient MapOwner mapOwner; - /** - * External class loaders if serialization and deserialization is to be performed successfully. - */ - protected transient ClassLoader[] externalLoaders; - - /** - * The node we are currently backing up data to, this index will rotate - * on a round robin basis - */ - protected transient int currentNode = 0; - - /** - * Since the map keeps internal membership - * this is the timeout for a ping message to be responded to - * If a remote map doesn't respond within this timeframe, - * its considered dead. - */ - protected transient long accessTimeout = 5000; - - /** - * Readable string of the mapContextName value - */ - protected transient String mapname = ""; - - //------------------------------------------------------------------------------ - // map owner interface - //------------------------------------------------------------------------------ - - public static interface MapOwner { - public void objectMadePrimay(Object key, Object value); - } - - //------------------------------------------------------------------------------ - // CONSTRUCTORS - //------------------------------------------------------------------------------ - - /** - * Creates a new map - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messags - * @param mapContextName String - unique name for this map, to allow multiple maps per channel - * @param initialCapacity int - the size of this map, see HashMap - * @param loadFactor float - load factor, see HashMap - * @param cls - a list of classloaders to be used for deserialization of objects. - */ - public AbstractReplicatedMap(MapOwner owner, - Channel channel, - long timeout, - String mapContextName, - int initialCapacity, - float loadFactor, - int channelSendOptions, - ClassLoader[] cls) { - super(initialCapacity, loadFactor, 15); - init(owner, channel, mapContextName, timeout, channelSendOptions, cls); - - } - - /** - * Helper methods, wraps a single member in an array - * @param m Member - * @return Member[] - */ - protected Member[] wrap(Member m) { - if (m == null) - return new Member[0]; - else - return new Member[] {m}; - } - - /** - * Initializes the map by creating the RPC channel, registering itself as a channel listener - * This method is also responsible for initiating the state transfer - * @param owner Object - * @param channel Channel - * @param mapContextName String - * @param timeout long - * @param channelSendOptions int - * @param cls ClassLoader[] - */ - protected void init(MapOwner owner, - Channel channel, - String mapContextName, - long timeout, - int channelSendOptions, - ClassLoader[] cls) { - log.info("Initializing AbstractReplicatedMap with context name:" + mapContextName); - this.mapOwner = owner; - this.externalLoaders = cls; - this.channelSendOptions = channelSendOptions; - this.channel = channel; - this.rpcTimeout = timeout; - - try { - this.mapname = mapContextName; - //unique context is more efficient if it is stored as bytes - this.mapContextName = mapContextName.getBytes(chset); - } catch (UnsupportedEncodingException x) { - log.warn("Unable to encode mapContextName[" + mapContextName - + "] using getBytes(" - + chset - + ") using default getBytes()", x); - this.mapContextName = mapContextName.getBytes(); - } - if (log.isTraceEnabled()) - log.trace("Created Lazy Map with name:" + mapContextName - + ", bytes:" - + Arrays.toString(this.mapContextName)); - - //create an rpc channel and add the map as a listener - this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); - //add this map as a message listener - this.channel.addChannelListener(this); - //listen for membership notifications - this.channel.addMembershipListener(this); - - try { - //broadcast our map, this just notifies other members of our existence - broadcast(MapMessage.MSG_INIT, true); - //transfer state from another map - transferState(); - //state is transferred, we are ready for messaging - broadcast(MapMessage.MSG_START, true); - } catch (ChannelException x) { - log.warn("Unable to send map start message."); - throw new RuntimeException("Unable to start replicated map.", x); - } - } - - /** - * Sends a ping out to all the members in the cluster, not just map members - * that this map is alive. - * @param timeout long - * @throws ChannelException - */ - protected void ping(long timeout) throws ChannelException { - //send out a map membership message, only wait for the first reply - MapMessage msg = - new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel - .getLocalMember(false), null); - if (channel.getMembers().length > 0) { - //send a ping, wait for all nodes to reply - Response[] resp = - rpcChannel.send(channel.getMembers(), - msg, - rpcChannel.ALL_REPLY, - (channelSendOptions), - (int)accessTimeout); - for (int i = 0; i < resp.length; i++) { - memberAlive(resp[i].getSource()); - } //for - } - //update our map of members, expire some if we didn't receive a ping back - synchronized (mapMembers) { - Iterator it = mapMembers.entrySet().iterator(); - long now = System.currentTimeMillis(); - while (it.hasNext()) { - Map.Entry entry = (Map.Entry)it.next(); - long access = ((Long)entry.getValue()).longValue(); - if ((now - access) > timeout) { - it.remove(); - memberDisappeared((Member)entry.getKey()); - } - } - }//synch - } - - /** - * We have received a member alive notification - * @param member Member - */ - protected void memberAlive(Member member) { - synchronized (mapMembers) { - if (!mapMembers.containsKey(member)) { - mapMemberAdded(member); - } //end if - mapMembers.put(member, new Long(System.currentTimeMillis())); - } - } - - /** - * Helper method to broadcast a message to all members in a channel - * @param msgtype int - * @param rpc boolean - * @throws ChannelException - */ - protected void broadcast(int msgtype, boolean rpc) throws ChannelException { - //send out a map membership message, only wait for the first reply - MapMessage msg = - new MapMessage(this.mapContextName, msgtype, false, null, null, null, channel.getLocalMember(false), null); - if (rpc) { - Response[] resp = - rpcChannel.send(channel.getMembers(), msg, rpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout); - for (int i = 0; i < resp.length; i++) { - mapMemberAdded(resp[i].getSource()); - messageReceived(resp[i].getMessage(), resp[i].getSource()); - } - } else { - channel.send(channel.getMembers(), msg, channelSendOptions); - } - } - - public void breakdown() { - finalize(); - } - - public void finalize() { - try { - broadcast(MapMessage.MSG_STOP, false); - } catch (Exception ignore) { - } - //cleanup - if (this.rpcChannel != null) { - this.rpcChannel.breakdown(); - } - if (this.channel != null) { - this.channel.removeChannelListener(this); - this.channel.removeMembershipListener(this); - } - this.rpcChannel = null; - this.channel = null; - this.mapMembers.clear(); - super.clear(); - this.stateTransferred = false; - this.externalLoaders = null; - } - - public int hashCode() { - return Arrays.hashCode(this.mapContextName); - } - - public boolean equals(Object o) { - if (o == null) - return false; - if (!(o instanceof AbstractReplicatedMap)) - return false; - if (!(o.getClass().equals(this.getClass()))) - return false; - AbstractReplicatedMap other = (AbstractReplicatedMap)o; - return Arrays.equals(mapContextName, other.mapContextName); - } - - //------------------------------------------------------------------------------ - // GROUP COM INTERFACES - //------------------------------------------------------------------------------ - public Member[] getMapMembers(Map members) { - synchronized (members) { - Member[] result = new Member[members.size()]; - members.keySet().toArray(result); - return result; - } - } - - public Member[] getMapMembers() { - return getMapMembers(this.mapMembers); - } - - public Member[] getMapMembersExcl(Member[] exclude) { - synchronized (mapMembers) { - Map list = (Map)((HashMap)mapMembers).clone(); - for (int i = 0; i < exclude.length; i++) - list.remove(exclude[i]); - return getMapMembers(list); - } - } - - /** - * Replicates any changes to the object since the last time - * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated
- * @param complete - if set to true, the object is replicated to its backup - * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will - * be replicated - */ - public void replicate(Object key, boolean complete) { - if (log.isTraceEnabled()) - log.trace("Replicate invoked on key:" + key); - MapEntry entry = (MapEntry)super.get(key); - if (entry == null) - return; - if (!entry.isSerializable()) - return; - if (entry != null && entry.isPrimary() && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) { - Object value = entry.getValue(); - //check to see if we need to replicate this object isDirty()||complete - boolean repl = complete || ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDirty()); - - if (!repl) { - if (log.isTraceEnabled()) - log.trace("Not replicating:" + key + ", no change made"); - - return; - } - //check to see if the message is diffable - boolean diff = ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable()); - MapMessage msg = null; - if (diff) { - ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue(); - try { - rentry.lock(); - //construct a diff message - msg = - new MapMessage(mapContextName, MapMessage.MSG_BACKUP, true, (Serializable)entry.getKey(), null, - rentry.getDiff(), entry.getPrimary(), entry.getBackupNodes()); - } catch (IOException x) { - log.error("Unable to diff object. Will replicate the entire object instead.", x); - } finally { - rentry.unlock(); - } - - } - if (msg == null) { - //construct a complete - msg = - new MapMessage(mapContextName, MapMessage.MSG_BACKUP, false, (Serializable)entry.getKey(), - (Serializable)entry.getValue(), null, entry.getPrimary(), entry.getBackupNodes()); - - } - try { - if (channel != null && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) { - channel.send(entry.getBackupNodes(), msg, channelSendOptions); - } - } catch (ChannelException x) { - log.error("Unable to replicate data.", x); - } - } //end if - - } - - /** - * This can be invoked by a periodic thread to replicate out any changes. - * For maps that don't store objects that implement ReplicatedMapEntry, this - * method should be used infrequently to avoid large amounts of data transfer - * @param complete boolean - */ - public void replicate(boolean complete) { - Iterator> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry e = i.next(); - replicate(e.getKey(), complete); - } //while - - } - - public void transferState() { - try { - Member[] members = getMapMembers(); - Member backup = members.length > 0 ? (Member)members[0] : null; - if (backup != null) { - MapMessage msg = - new MapMessage(mapContextName, getStateMessageType(), false, null, null, null, null, null); - Response[] resp = - rpcChannel.send(new Member[] {backup}, msg, RpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout); - if (resp.length > 0) { - synchronized (stateMutex) { - msg = (MapMessage)resp[0].getMessage(); - msg.deserialize(getExternalLoaders()); - List list = (List)msg.getValue(); - for (int i = 0; i < list.size(); i++) { - messageReceived((Serializable)list.get(i), resp[0].getSource()); - } //for - } - } else { - log.warn("Transfer state, 0 replies, probably a timeout."); - } - } - } catch (ChannelException x) { - log.error("Unable to transfer LazyReplicatedMap state.", x); - } catch (IOException x) { - log.error("Unable to transfer LazyReplicatedMap state.", x); - } catch (ClassNotFoundException x) { - log.error("Unable to transfer LazyReplicatedMap state.", x); - } - stateTransferred = true; - } - - /** - * @todo implement state transfer - * @param msg Serializable - * @return Serializable - null if no reply should be sent - */ - public Serializable replyRequest(Serializable msg, final Member sender) { - if (!(msg instanceof MapMessage)) - return null; - MapMessage mapmsg = (MapMessage)msg; - - //map init request - if (mapmsg.getMsgType() == MapMessage.MSG_INIT) { - mapmsg.setPrimary(channel.getLocalMember(false)); - return mapmsg; - } - - //map start request - if (mapmsg.getMsgType() == MapMessage.MSG_START) { - mapmsg.setPrimary(channel.getLocalMember(false)); - mapMemberAdded(sender); - return mapmsg; - } - - //backup request - if (mapmsg.getMsgType() == MapMessage.MSG_RETRIEVE_BACKUP) { - MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); - if (entry == null || (!entry.isSerializable())) - return null; - mapmsg.setValue((Serializable)entry.getValue()); - return mapmsg; - } - - //state transfer request - if (mapmsg.getMsgType() == MapMessage.MSG_STATE || mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY) { - synchronized (stateMutex) { //make sure we dont do two things at the same time - ArrayList list = new ArrayList(); - Iterator> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry e = i.next(); - MapEntry entry = (MapEntry)super.get(e.getKey()); - if (entry != null && entry.isSerializable()) { - boolean copy = (mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY); - MapMessage me = - new MapMessage(mapContextName, copy ? MapMessage.MSG_COPY : MapMessage.MSG_PROXY, false, - (Serializable)entry.getKey(), copy ? (Serializable)entry.getValue() : null, - null, entry.getPrimary(), entry.getBackupNodes()); - list.add(me); - } - } - mapmsg.setValue(list); - return mapmsg; - - } //synchronized - } - - return null; - - } - - /** - * If the reply has already been sent to the requesting thread, - * the rpc callback can handle any data that comes in after the fact. - * @param msg Serializable - * @param sender Member - */ - public void leftOver(Serializable msg, Member sender) { - //left over membership messages - if (!(msg instanceof MapMessage)) - return; - - MapMessage mapmsg = (MapMessage)msg; - try { - mapmsg.deserialize(getExternalLoaders()); - if (mapmsg.getMsgType() == MapMessage.MSG_START) { - mapMemberAdded(mapmsg.getPrimary()); - } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) { - memberAlive(mapmsg.getPrimary()); - } - } catch (IOException x) { - log.error("Unable to deserialize MapMessage.", x); - } catch (ClassNotFoundException x) { - log.error("Unable to deserialize MapMessage.", x); - } - } - - public void messageReceived(Serializable msg, Member sender) { - if (!(msg instanceof MapMessage)) - return; - - MapMessage mapmsg = (MapMessage)msg; - if (log.isTraceEnabled()) { - log.trace("Map[" + mapname + "] received message:" + mapmsg); - } - - try { - mapmsg.deserialize(getExternalLoaders()); - } catch (IOException x) { - log.error("Unable to deserialize MapMessage.", x); - return; - } catch (ClassNotFoundException x) { - log.error("Unable to deserialize MapMessage.", x); - return; - } - if (log.isTraceEnabled()) - log.trace("Map message received from:" + sender.getName() + " msg:" + mapmsg); - if (mapmsg.getMsgType() == MapMessage.MSG_START) { - mapMemberAdded(mapmsg.getPrimary()); - } - - if (mapmsg.getMsgType() == MapMessage.MSG_STOP) { - memberDisappeared(mapmsg.getPrimary()); - } - - if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) { - MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); - if (entry == null) { - entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue()); - entry.setBackup(false); - entry.setProxy(true); - entry.setBackupNodes(mapmsg.getBackupNodes()); - entry.setPrimary(mapmsg.getPrimary()); - super.put(entry.getKey(), entry); - } else { - entry.setProxy(true); - entry.setBackup(false); - entry.setBackupNodes(mapmsg.getBackupNodes()); - entry.setPrimary(mapmsg.getPrimary()); - } - } - - if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) { - super.remove(mapmsg.getKey()); - } - - if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP || mapmsg.getMsgType() == MapMessage.MSG_COPY) { - MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); - if (entry == null) { - entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue()); - entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); - entry.setProxy(false); - entry.setBackupNodes(mapmsg.getBackupNodes()); - entry.setPrimary(mapmsg.getPrimary()); - if (mapmsg.getValue() != null && mapmsg.getValue() instanceof ReplicatedMapEntry) { - ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner()); - } - } else { - entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); - entry.setProxy(false); - entry.setBackupNodes(mapmsg.getBackupNodes()); - entry.setPrimary(mapmsg.getPrimary()); - if (entry.getValue() instanceof ReplicatedMapEntry) { - ReplicatedMapEntry diff = (ReplicatedMapEntry)entry.getValue(); - if (mapmsg.isDiff()) { - try { - diff.lock(); - diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length); - } catch (Exception x) { - log.error("Unable to apply diff to key:" + entry.getKey(), x); - } finally { - diff.unlock(); - } - } else { - if (mapmsg.getValue() != null) - entry.setValue(mapmsg.getValue()); - ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner()); - } //end if - } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) { - ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue(); - re.setOwner(getMapOwner()); - entry.setValue(re); - } else { - if (mapmsg.getValue() != null) - entry.setValue(mapmsg.getValue()); - } //end if - } //end if - super.put(entry.getKey(), entry); - } //end if - } - - public boolean accept(Serializable msg, Member sender) { - boolean result = false; - if (msg instanceof MapMessage) { - if (log.isTraceEnabled()) - log.trace("Map[" + mapname + "] accepting...." + msg); - result = Arrays.equals(mapContextName, ((MapMessage)msg).getMapId()); - if (log.isTraceEnabled()) - log.trace("Msg[" + mapname + "] accepted[" + result + "]...." + msg); - } - return result; - } - - public void mapMemberAdded(Member member) { - Member self = getChannel().getLocalMember(false); - if (member.equals(self)) - return; - boolean memberAdded = false; - //select a backup node if we don't have one - synchronized (mapMembers) { - if (!mapMembers.containsKey(member)) { - mapMembers.put(member, new Long(System.currentTimeMillis())); - memberAdded = true; - } - } - if (memberAdded) { - synchronized (stateMutex) { - Iterator> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry e = i.next(); - MapEntry entry = (MapEntry)super.get(e.getKey()); - if (entry == null) - continue; - // if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { - // [rfeng] Change the behavior to replicate to all nodes - if (entry.isPrimary() && self.equals(entry.getPrimary())) { - try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); - entry.setBackupNodes(backup); - entry.setPrimary(self); - } catch (ChannelException x) { - log.error("Unable to select backup node.", x); - } //catch - } //end if - } //while - } //synchronized - }//end if - } - - public boolean inSet(Member m, Member[] set) { - if (set == null) - return false; - boolean result = false; - for (int i = 0; i < set.length && (!result); i++) - if (m.equals(set[i])) - result = true; - return result; - } - - public Member[] excludeFromSet(Member[] mbrs, Member[] set) { - List result = new ArrayList(); - for (int i = 0; i < set.length; i++) { - boolean include = true; - for (int j = 0; j < mbrs.length; j++) - if (mbrs[j].equals(set[i])) - include = false; - if (include) - result.add(set[i]); - } - return (Member[])result.toArray(new Member[result.size()]); - } - - public void memberAdded(Member member) { - //do nothing - } - - public void memberDisappeared(Member member) { - boolean removed = false; - synchronized (mapMembers) { - removed = (mapMembers.remove(member) != null); - if (!removed) { - if (log.isDebugEnabled()) - log.debug("Member[" + member + "] disappeared, but was not present in the map."); - return; //the member was not part of our map. - } - } - - Iterator> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry e = i.next(); - MapEntry entry = (MapEntry)super.get(e.getKey()); - if (entry == null) - continue; - if (entry.isPrimary() && inSet(member, entry.getBackupNodes())) { - if (log.isDebugEnabled()) - log.debug("[1] Primary choosing a new backup"); - try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); - entry.setBackupNodes(backup); - entry.setPrimary(channel.getLocalMember(false)); - } catch (ChannelException x) { - log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); - } - } else if (member.equals(entry.getPrimary())) { - if (log.isDebugEnabled()) - log.debug("[2] Primary disappeared"); - entry.setPrimary(null); - } //end if - - if (entry.isProxy() && entry.getPrimary() == null - && entry.getBackupNodes() != null - && entry.getBackupNodes().length == 1 - && entry.getBackupNodes()[0].equals(member)) { - //remove proxies that have no backup nor primaries - if (log.isDebugEnabled()) - log.debug("[3] Removing orphaned proxy"); - i.remove(); - } else if (entry.getPrimary() == null && entry.isBackup() - && entry.getBackupNodes() != null - && entry.getBackupNodes().length == 1 - && entry.getBackupNodes()[0].equals(channel.getLocalMember(false))) { - try { - if (log.isDebugEnabled()) - log.debug("[4] Backup becoming primary"); - entry.setPrimary(channel.getLocalMember(false)); - entry.setBackup(false); - entry.setProxy(false); - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); - entry.setBackupNodes(backup); - if (mapOwner != null) - mapOwner.objectMadePrimay(entry.getKey(), entry.getValue()); - - } catch (ChannelException x) { - log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); - } - } - - } //while - } - - public int getNextBackupIndex() { - int size = mapMembers.size(); - if (mapMembers.size() == 0) - return -1; - int node = currentNode++; - if (node >= size) { - node = 0; - currentNode = 0; - } - return node; - } - - public Member getNextBackupNode() { - Member[] members = getMapMembers(); - int node = getNextBackupIndex(); - if (members.length == 0 || node == -1) - return null; - if (node >= members.length) - node = 0; - return members[node]; - } - - protected abstract Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException; - - public void heartbeat() { - try { - ping(accessTimeout); - } catch (Exception x) { - log.error("Unable to send AbstractReplicatedMap.ping message", x); - } - } - - //------------------------------------------------------------------------------ - // METHODS TO OVERRIDE - //------------------------------------------------------------------------------ - - /** - * Removes an object from this map, it will also remove it from - * - * @param key Object - * @return Object - */ - public Object remove(Object key) { - return remove(key, true); - } - - public Object remove(Object key, boolean notify) { - MapEntry entry = (MapEntry)super.remove(key); - - try { - if (getMapMembers().length > 0 && notify) { - MapMessage msg = - new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable)key, null, null, - null, null); - getChannel().send(getMapMembers(), msg, getChannelSendOptions()); - } - } catch (ChannelException x) { - log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation", x); - } - return entry != null ? entry.getValue() : null; - } - - public MapEntry getInternal(Object key) { - return (MapEntry)super.get(key); - } - - public Object get(Object key) { - MapEntry entry = (MapEntry)super.get(key); - if (log.isTraceEnabled()) - log.trace("Requesting id:" + key + " entry:" + entry); - if (entry == null) - return null; - if (!entry.isPrimary()) { - //if the message is not primary, we need to retrieve the latest value - try { - Member[] backup = null; - MapMessage msg = null; - if (!entry.isBackup()) { - //make sure we don't retrieve from ourselves - msg = - new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable)key, - null, null, null, null); - Response[] resp = - getRpcChannel().send(entry.getBackupNodes(), - msg, - RpcChannel.FIRST_REPLY, - Channel.SEND_OPTIONS_DEFAULT, - getRpcTimeout()); - if (resp == null || resp.length == 0) { - //no responses - log.warn("Unable to retrieve remote object for key:" + key); - return null; - } - msg = (MapMessage)resp[0].getMessage(); - msg.deserialize(getExternalLoaders()); - backup = entry.getBackupNodes(); - if (entry.getValue() instanceof ReplicatedMapEntry) { - ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); - val.setOwner(getMapOwner()); - } - if (msg.getValue() != null) - entry.setValue(msg.getValue()); - } - if (entry.isBackup()) { - //select a new backup node - backup = publishEntryInfo(key, entry.getValue(), entry.getBackupNodes()); - } else if (entry.isProxy()) { - //invalidate the previous primary - msg = - new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable)key, null, null, - channel.getLocalMember(false), backup); - Member[] dest = getMapMembersExcl(backup); - if (dest != null && dest.length > 0) { - getChannel().send(dest, msg, getChannelSendOptions()); - } - } - entry.setPrimary(channel.getLocalMember(false)); - entry.setBackupNodes(backup); - entry.setBackup(false); - entry.setProxy(false); - - } catch (Exception x) { - log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x); - return null; - } - } - if (log.isTraceEnabled()) - log.trace("Requesting id:" + key + " result:" + entry.getValue()); - if (entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry) { - ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); - //hack, somehow this is not being set above - val.setOwner(getMapOwner()); - - } - return entry.getValue(); - } - - protected void printMap(String header) { - try { - System.out.println("\nDEBUG MAP:" + header); - System.out.println("Map[" + new String(mapContextName, chset) + ", Map Size:" + super.size()); - Member[] mbrs = getMapMembers(); - for (int i = 0; i < mbrs.length; i++) { - System.out.println("Mbr[" + (i + 1) + "=" + mbrs[i].getName()); - } - Iterator i = super.entrySet().iterator(); - int cnt = 0; - - while (i.hasNext()) { - Map.Entry e = (Map.Entry)i.next(); - System.out.println((++cnt) + ". " + super.get(e.getKey())); - } - System.out.println("EndMap]\n\n"); - } catch (Exception ignore) { - ignore.printStackTrace(); - } - } - - /** - * Returns true if the key has an entry in the map. - * The entry can be a proxy or a backup entry, invoking get(key) - * will make this entry primary for the group - * @param key Object - * @return boolean - */ - public boolean containsKey(Object key) { - return super.containsKey(key); - } - - public Object put(Object key, Object value) { - return put(key, value, true); - } - - public Object put(Object key, Object value, boolean notify) { - MapEntry entry = new MapEntry(key, value); - entry.setBackup(false); - entry.setProxy(false); - entry.setPrimary(channel.getLocalMember(false)); - - Object old = null; - - //make sure that any old values get removed - if (containsKey(key)) - old = remove(key); - try { - if (notify) { - Member[] backup = publishEntryInfo(key, value, entry.getBackupNodes()); - entry.setBackupNodes(backup); - } - } catch (ChannelException x) { - log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x); - } - super.put(key, entry); - return old; - } - - /** - * Copies all values from one map to this instance - * @param m Map - */ - public void putAll(Map m) { - Iterator i = m.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry entry = (Map.Entry)i.next(); - put(entry.getKey(), entry.getValue()); - } - } - - public void clear() { - clear(true); - } - - public void clear(boolean notify) { - if (notify) { - //only delete active keys - Iterator keys = keySet().iterator(); - while (keys.hasNext()) - remove(keys.next()); - } else { - super.clear(); - } - } - - public boolean containsValue(Object value) { - if (value == null) { - return super.containsValue(value); - } else { - Iterator> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry e = i.next(); - MapEntry entry = (MapEntry)super.get(e.getKey()); - if (entry != null && entry.isPrimary() && value.equals(entry.getValue())) - return true; - }//while - return false; - }//end if - } - - public Object clone() { - throw new UnsupportedOperationException("This operation is not valid on a replicated map"); - } - - /** - * Returns the entire contents of the map - * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information - * about the object. - * @return Set - */ - public Set> entrySetFull() { - return super.entrySet(); - } - - public Set keySetFull() { - return super.keySet(); - } - - public int sizeFull() { - return super.size(); - } - - public Set> entrySet() { - Set> set = new LinkedHashSet>(super.size()); - Iterator> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry e = i.next(); - Object key = e.getKey(); - MapEntry entry = (MapEntry)super.get(key); - if (entry != null && entry.isPrimary()) { - set.add(new MapEntry(key, entry.getValue())); - } - } - return Collections.unmodifiableSet(set); - } - - public Set keySet() { - //todo implement - //should only return keys where this is active. - LinkedHashSet set = new LinkedHashSet(super.size()); - Iterator> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry e = i.next(); - Object key = e.getKey(); - MapEntry entry = (MapEntry)super.get(key); - if (entry != null && entry.isPrimary()) - set.add(key); - } - return Collections.unmodifiableSet(set); - - } - - public int size() { - //todo, implement a counter variable instead - //only count active members in this node - int counter = 0; - Iterator> it = super.entrySet().iterator(); - while (it != null && it.hasNext()) { - Map.Entry e = it.next(); - if (e != null) { - MapEntry entry = (MapEntry)super.get(e.getKey()); - if (entry != null && entry.isPrimary() && entry.getValue() != null) - counter++; - } - } - return counter; - } - - protected boolean removeEldestEntry(Map.Entry eldest) { - return false; - } - - public boolean isEmpty() { - return size() == 0; - } - - public Collection values() { - ArrayList values = new ArrayList(); - Iterator> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry e = i.next(); - MapEntry entry = (MapEntry)super.get(e.getKey()); - if (entry != null && entry.isPrimary() && entry.getValue() != null) - values.add(entry.getValue()); - } - return Collections.unmodifiableCollection(values); - } - - //------------------------------------------------------------------------------ - // Map Entry class - //------------------------------------------------------------------------------ - public static class MapEntry implements Map.Entry { - private boolean backup; - private boolean proxy; - private Member[] backupNodes; - private Member primary; - private Object key; - private Object value; - - public MapEntry(Object key, Object value) { - setKey(key); - setValue(value); - - } - - public boolean isKeySerializable() { - return (key == null) || (key instanceof Serializable); - } - - public boolean isValueSerializable() { - return (value == null) || (value instanceof Serializable); - } - - public boolean isSerializable() { - return isKeySerializable() && isValueSerializable(); - } - - public boolean isBackup() { - return backup; - } - - public void setBackup(boolean backup) { - this.backup = backup; - } - - public boolean isProxy() { - return proxy; - } - - public boolean isPrimary() { - return ((!proxy) && (!backup)); - } - - public void setProxy(boolean proxy) { - this.proxy = proxy; - } - - public boolean isDiffable() { - return (value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable(); - } - - public void setBackupNodes(Member[] nodes) { - this.backupNodes = nodes; - } - - public Member[] getBackupNodes() { - return backupNodes; - } - - public void setPrimary(Member m) { - primary = m; - } - - public Member getPrimary() { - return primary; - } - - public Object getValue() { - return value; - } - - public Object setValue(Object value) { - Object old = this.value; - this.value = value; - return old; - } - - public Object getKey() { - return key; - } - - public Object setKey(Object key) { - Object old = this.key; - this.key = key; - return old; - } - - public int hashCode() { - return key.hashCode(); - } - - public boolean equals(Object o) { - return key.equals(o); - } - - /** - * apply a diff, or an entire object - * @param data byte[] - * @param offset int - * @param length int - * @param diff boolean - * @throws IOException - * @throws ClassNotFoundException - */ - public void apply(byte[] data, int offset, int length, boolean diff) throws IOException, ClassNotFoundException { - if (isDiffable() && diff) { - ReplicatedMapEntry rentry = (ReplicatedMapEntry)value; - try { - rentry.lock(); - rentry.applyDiff(data, offset, length); - } finally { - rentry.unlock(); - } - } else if (length == 0) { - value = null; - proxy = true; - } else { - value = XByteBuffer.deserialize(data, offset, length); - } - } - - public String toString() { - StringBuffer buf = new StringBuffer("MapEntry[key:"); - buf.append(getKey()).append("; "); - buf.append("value:").append(getValue()).append("; "); - buf.append("primary:").append(isPrimary()).append("; "); - buf.append("backup:").append(isBackup()).append("; "); - buf.append("proxy:").append(isProxy()).append(";]"); - return buf.toString(); - } - - } - - //------------------------------------------------------------------------------ - // map message to send to and from other maps - //------------------------------------------------------------------------------ - - public static class MapMessage implements Serializable { - private static final long serialVersionUID = -7847288807489375686L; - public static final int MSG_BACKUP = 1; - public static final int MSG_RETRIEVE_BACKUP = 2; - public static final int MSG_PROXY = 3; - public static final int MSG_REMOVE = 4; - public static final int MSG_STATE = 5; - public static final int MSG_START = 6; - public static final int MSG_STOP = 7; - public static final int MSG_INIT = 8; - public static final int MSG_COPY = 9; - public static final int MSG_STATE_COPY = 10; - - private byte[] mapId; - private int msgtype; - private boolean diff; - private transient Serializable key; - private transient Serializable value; - private byte[] valuedata; - private byte[] keydata; - private byte[] diffvalue; - private Member[] nodes; - private Member primary; - - public String toString() { - StringBuffer buf = new StringBuffer("MapMessage[context="); - buf.append(new String(mapId)); - buf.append("; type="); - buf.append(getTypeDesc()); - buf.append("; key="); - buf.append(key); - buf.append("; value="); - buf.append(value); - return buf.toString(); - } - - public String getTypeDesc() { - switch (msgtype) { - case MSG_BACKUP: - return "MSG_BACKUP"; - case MSG_RETRIEVE_BACKUP: - return "MSG_RETRIEVE_BACKUP"; - case MSG_PROXY: - return "MSG_PROXY"; - case MSG_REMOVE: - return "MSG_REMOVE"; - case MSG_STATE: - return "MSG_STATE"; - case MSG_START: - return "MSG_START"; - case MSG_STOP: - return "MSG_STOP"; - case MSG_INIT: - return "MSG_INIT"; - case MSG_STATE_COPY: - return "MSG_STATE_COPY"; - case MSG_COPY: - return "MSG_COPY"; - default: - return "UNKNOWN"; - } - } - - public MapMessage() { - } - - public MapMessage(byte[] mapId, - int msgtype, - boolean diff, - Serializable key, - Serializable value, - byte[] diffvalue, - Member primary, - Member[] nodes) { - this.mapId = mapId; - this.msgtype = msgtype; - this.diff = diff; - this.key = key; - this.value = value; - this.diffvalue = diffvalue; - this.nodes = nodes; - this.primary = primary; - setValue(value); - setKey(key); - } - - public void deserialize(ClassLoader[] cls) throws IOException, ClassNotFoundException { - key(cls); - value(cls); - } - - public int getMsgType() { - return msgtype; - } - - public boolean isDiff() { - return diff; - } - - public Serializable getKey() { - try { - return key(null); - } catch (Exception x) { - log.error("Deserialization error of the MapMessage.key", x); - return null; - } - } - - public Serializable key(ClassLoader[] cls) throws IOException, ClassNotFoundException { - if (key != null) - return key; - if (keydata == null || keydata.length == 0) - return null; - key = XByteBuffer.deserialize(keydata, 0, keydata.length, cls); - keydata = null; - return key; - } - - public byte[] getKeyData() { - return keydata; - } - - public Serializable getValue() { - try { - return value(null); - } catch (Exception x) { - log.error("Deserialization error of the MapMessage.value", x); - return null; - } - } - - public Serializable value(ClassLoader[] cls) throws IOException, ClassNotFoundException { - if (value != null) - return value; - if (valuedata == null || valuedata.length == 0) - return null; - value = XByteBuffer.deserialize(valuedata, 0, valuedata.length, cls); - valuedata = null; - ; - return value; - } - - public byte[] getValueData() { - return valuedata; - } - - public byte[] getDiffValue() { - return diffvalue; - } - - public Member[] getBackupNodes() { - return nodes; - } - - private void setBackUpNodes(Member[] nodes) { - this.nodes = nodes; - } - - public Member getPrimary() { - return primary; - } - - private void setPrimary(Member m) { - primary = m; - } - - public byte[] getMapId() { - return mapId; - } - - public void setValue(Serializable value) { - try { - if (value != null) - valuedata = XByteBuffer.serialize(value); - this.value = value; - } catch (IOException x) { - throw new RuntimeException(x); - } - } - - public void setKey(Serializable key) { - try { - if (key != null) - keydata = XByteBuffer.serialize(key); - this.key = key; - } catch (IOException x) { - throw new RuntimeException(x); - } - } - - protected Member[] readMembers(ObjectInput in) throws IOException, ClassNotFoundException { - int nodecount = in.readInt(); - Member[] members = new Member[nodecount]; - for (int i = 0; i < members.length; i++) { - byte[] d = new byte[in.readInt()]; - in.read(d); - if (d.length > 0) - members[i] = MemberImpl.getMember(d); - } - return members; - } - - protected void writeMembers(ObjectOutput out, Member[] members) throws IOException { - if (members == null) - members = new Member[0]; - out.writeInt(members.length); - for (int i = 0; i < members.length; i++) { - if (members[i] != null) { - byte[] d = members[i] != null ? ((MemberImpl)members[i]).getData(false) : new byte[0]; - out.writeInt(d.length); - out.write(d); - } - } - } - - /** - * shallow clone - * @return Object - */ - public Object clone() { - MapMessage msg = - new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.primary, - this.nodes); - msg.keydata = this.keydata; - msg.valuedata = this.valuedata; - return msg; - } - } //MapMessage - - public Channel getChannel() { - return channel; - } - - public byte[] getMapContextName() { - return mapContextName; - } - - public RpcChannel getRpcChannel() { - return rpcChannel; - } - - public long getRpcTimeout() { - return rpcTimeout; - } - - public Object getStateMutex() { - return stateMutex; - } - - public boolean isStateTransferred() { - return stateTransferred; - } - - public MapOwner getMapOwner() { - return mapOwner; - } - - public ClassLoader[] getExternalLoaders() { - return externalLoaders; - } - - public int getChannelSendOptions() { - return channelSendOptions; - } - - public long getAccessTimeout() { - return accessTimeout; - } - - public void setMapOwner(MapOwner mapOwner) { - this.mapOwner = mapOwner; - } - - public void setExternalLoaders(ClassLoader[] externalLoaders) { - this.externalLoaders = externalLoaders; - } - - public void setChannelSendOptions(int channelSendOptions) { - this.channelSendOptions = channelSendOptions; - } - - public void setAccessTimeout(long accessTimeout) { - this.accessTimeout = accessTimeout; - } - -} diff --git a/branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java b/branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java deleted file mode 100644 index 9e540743bf..0000000000 --- a/branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.tribes; - -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * The Map that can fire events on put/remove of entries - */ -public abstract class MapStore extends ConcurrentHashMap { - private static final long serialVersionUID = -2127235547082144368L; - private List listeners = new CopyOnWriteArrayList(); - - protected MapStore(int initialCapacity, float loadFactor, int concurrencyLevel) { - super(initialCapacity, loadFactor, concurrencyLevel); - } - - @Override - public Object put(Object key, Object value) { - Object old = super.put(key, value); - if (old != null) { - for (MapListener listener : listeners) { - listener.entryUpdated(key, old, value); - } - } else { - for (MapListener listener : listeners) { - listener.entryAdded(key, value); - } - - } - return old; - } - - @Override - public Object remove(Object key) { - Object old = super.remove(key); - if (old != null) { - for (MapListener listener : listeners) { - listener.entryRemoved(key, old); - } - } - return old; - } - - public void addListener(MapListener listener) { - listeners.add(listener); - } - - public List getListeners() { - return listeners; - } - - public boolean removeListener(MapListener listener) { - return listeners.remove(listener); - } - - public static interface MapListener { - void entryAdded(Object key, Object value); - - void entryUpdated(Object key, Object oldValue, Object newValue); - - void entryRemoved(Object key, Object value); - } -} diff --git a/branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java deleted file mode 100644 index 3a703bca62..0000000000 --- a/branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java +++ /dev/null @@ -1,451 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.tribes; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.URI; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; -import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.catalina.tribes.Channel; -import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.group.GroupChannel; -import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor; -import org.apache.catalina.tribes.membership.McastService; -import org.apache.catalina.tribes.membership.StaticMember; -import org.apache.tuscany.sca.assembly.Endpoint; -import org.apache.tuscany.sca.assembly.EndpointReference; -import org.apache.tuscany.sca.core.ExtensionPointRegistry; -import org.apache.tuscany.sca.core.LifeCycleListener; -import org.apache.tuscany.sca.endpoint.tribes.AbstractReplicatedMap.MapEntry; -import org.apache.tuscany.sca.endpoint.tribes.MapStore.MapListener; -import org.apache.tuscany.sca.runtime.EndpointListener; -import org.apache.tuscany.sca.runtime.EndpointRegistry; - -/** - * A replicated EndpointRegistry based on Apache Tomcat Tribes - */ -public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleListener, MapListener { - private final static Logger logger = Logger.getLogger(ReplicatedEndpointRegistry.class.getName()); - private static final String MULTICAST_ADDRESS = "228.0.0.100"; - private static final int MULTICAST_PORT = 50000; - - private int port = MULTICAST_PORT; - private String address = MULTICAST_ADDRESS; - private String bind = null; - private int timeout = 50; - - private final static String DEFAULT_DOMAIN_URI = "http://tuscany.apache.org/sca/1.1/domains/default"; - private String domainURI = DEFAULT_DOMAIN_URI; - private List endpointreferences = new CopyOnWriteArrayList(); - private List listeners = new CopyOnWriteArrayList(); - - private ExtensionPointRegistry registry; - private ReplicatedMap map; - private static List staticRoutes; - - private String id; - - private static final Channel createChannel(String address, int port, String bindAddress) { - - //create a channel - GroupChannel channel = new GroupChannel(); - McastService mcastService = (McastService)channel.getMembershipService(); - mcastService.setPort(port); - mcastService.setAddress(address); - - // REVIEW: In my case, there are multiple IP addresses - // One for the WIFI and the other one for VPN. For some reason the VPN one doesn't support - // Multicast - - if (bindAddress != null) { - mcastService.setBind(bindAddress); - } else { - mcastService.setBind(getBindAddress()); - } - - return channel; - } - - public ReplicatedEndpointRegistry(ExtensionPointRegistry registry, - Map attributes, - String domainRegistryURI, - String domainURI) { - this.registry = registry; - this.domainURI = domainURI; - this.id = "[" + System.identityHashCode(this) + "]"; - getParameters(attributes, domainRegistryURI); - } - - private Map getParameters(Map attributes, String domainRegistryURI) { - Map map = new HashMap(); - if (attributes != null) { - map.putAll(attributes); - } - URI uri = URI.create(domainRegistryURI); - if (uri.getHost() != null) { - map.put("address", uri.getHost()); - } - if (uri.getPort() != -1) { - map.put("port", String.valueOf(uri.getPort())); - } - int index = domainRegistryURI.indexOf('?'); - if (index == -1) { - setConfig(map); - return map; - } - String query = domainRegistryURI.substring(index + 1); - try { - query = URLDecoder.decode(query, "UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new IllegalArgumentException(e); - } - String[] params = query.split("&"); - for (String param : params) { - index = param.indexOf('='); - if (index != -1) { - map.put(param.substring(0, index), param.substring(index + 1)); - } - } - setConfig(map); - return map; - } - - private void setConfig(Map attributes) { - String portStr = attributes.get("port"); - if (portStr != null) { - port = Integer.parseInt(portStr); - if (port == -1) { - port = MULTICAST_PORT; - } - } - String address = attributes.get("address"); - if (address == null) { - address = MULTICAST_ADDRESS; - } - bind = attributes.get("bind"); - String timeoutStr = attributes.get("timeout"); - if (timeoutStr != null) { - timeout = Integer.parseInt(timeoutStr); - } - - String routesStr = attributes.get("routes"); - if (routesStr != null) { - StringTokenizer st = new StringTokenizer(routesStr); - staticRoutes = new ArrayList(); - while (st.hasMoreElements()) { - staticRoutes.add(URI.create("tcp://" + st.nextToken())); - } - } - } - - public void start() { - if (map != null) { - throw new IllegalStateException("The registry has already been started"); - } - map = - new ReplicatedMap(null, createChannel(address, port, bind), timeout, this.domainURI, - new ClassLoader[] {ReplicatedEndpointRegistry.class.getClassLoader()}); - map.addListener(this); - - if (staticRoutes != null) { - StaticMembershipInterceptor smi = new StaticMembershipInterceptor(); - for (URI staticRoute : staticRoutes) { - Member member; - try { - member = new StaticMember(staticRoute.getHost(), staticRoute.getPort(), 5000); - } catch (IOException e) { - throw new RuntimeException(e); - } - smi.addStaticMember(member); - logger.info("Added static route: " + staticRoute.getHost() + ":" + staticRoute.getPort()); - } - smi.setLocalMember(map.getChannel().getLocalMember(false)); - map.getChannel().addInterceptor(smi); - } - - try { - map.getChannel().start(Channel.DEFAULT); - } catch (ChannelException e) { - throw new IllegalStateException(e); - } - - } - - public void stop() { - if (map != null) { - map.removeListener(this); - Channel channel = map.getChannel(); - map.breakdown(); - try { - channel.stop(Channel.DEFAULT); - } catch (ChannelException e) { - logger.log(Level.WARNING, e.getMessage(), e); - } - map = null; - } - } - - public void addEndpoint(Endpoint endpoint) { - map.put(endpoint.getURI(), endpoint); - logger.info("Add endpoint - " + endpoint); - } - - public void addEndpointReference(EndpointReference endpointReference) { - endpointreferences.add(endpointReference); - logger.fine("Add endpoint reference - " + endpointReference); - } - - public void addListener(EndpointListener listener) { - listeners.add(listener); - } - - /** - * Parse the component/service/binding URI into an array of parts (componentURI, serviceName, bindingName) - * @param uri - * @return - */ - private String[] parse(String uri) { - String[] names = new String[3]; - int index = uri.lastIndexOf('#'); - if (index == -1) { - names[0] = uri; - } else { - names[0] = uri.substring(0, index); - String str = uri.substring(index + 1); - if (str.startsWith("service-binding(") && str.endsWith(")")) { - str = str.substring("service-binding(".length(), str.length() - 1); - String[] parts = str.split("/"); - if (parts.length != 2) { - throw new IllegalArgumentException("Invalid service-binding URI: " + uri); - } - names[1] = parts[0]; - names[2] = parts[1]; - } else if (str.startsWith("service(") && str.endsWith(")")) { - str = str.substring("service(".length(), str.length() - 1); - names[1] = str; - } else { - throw new IllegalArgumentException("Invalid component/service/binding URI: " + uri); - } - } - return names; - } - - private boolean matches(String target, String uri) { - String[] parts1 = parse(target); - String[] parts2 = parse(uri); - for (int i = 0; i < parts1.length; i++) { - if (parts1[i] == null || parts1[i].equals(parts2[i])) { - continue; - } else { - return false; - } - } - return true; - } - - public List findEndpoint(EndpointReference endpointReference) { - List foundEndpoints = new ArrayList(); - - logger.fine("Find endpoint for reference - " + endpointReference); - - if (endpointReference.getReference() != null) { - Endpoint targetEndpoint = endpointReference.getTargetEndpoint(); - for (Object v : map.values()) { - Endpoint endpoint = (Endpoint)v; - // TODO: implement more complete matching - logger.fine("Matching against - " + endpoint); - if (matches(targetEndpoint.getURI(), endpoint.getURI())) { - MapEntry entry = map.getInternal(endpoint.getURI()); - if (!isLocal(entry)) { - endpoint.setRemote(true); - } - // if (!entry.isPrimary()) { - endpoint.setExtensionPointRegistry(registry); - // } - foundEndpoints.add(endpoint); - logger.fine("Found endpoint with matching service - " + endpoint); - } - // else the service name doesn't match - } - } - return foundEndpoints; - } - - private boolean isLocal(MapEntry entry) { - return entry.getPrimary().equals(map.getChannel().getLocalMember(false)); - } - - public List findEndpointReference(Endpoint endpoint) { - return endpointreferences; - } - - public Endpoint getEndpoint(String uri) { - return (Endpoint)map.get(uri); - } - - public List getEndpointRefereneces() { - return endpointreferences; - } - - public List getEndpoints() { - return new ArrayList(map.values()); - } - - public List getListeners() { - return listeners; - } - - public void removeEndpoint(Endpoint endpoint) { - map.remove(endpoint.getURI()); - logger.info("Remove endpoint - " + endpoint); - } - - public void removeEndpointReference(EndpointReference endpointReference) { - endpointreferences.remove(endpointReference); - logger.fine("Remove endpoint reference - " + endpointReference); - } - - public void removeListener(EndpointListener listener) { - listeners.remove(listener); - } - - public void replicate(boolean complete) { - map.replicate(complete); - } - - public void updateEndpoint(String uri, Endpoint endpoint) { - Endpoint oldEndpoint = getEndpoint(uri); - if (oldEndpoint == null) { - throw new IllegalArgumentException("Endpoint is not found: " + uri); - } - map.put(endpoint.getURI(), endpoint); - } - - public void entryAdded(Object key, Object value) { - MapEntry entry = (MapEntry)value; - if (!isLocal(entry)) { - logger.info(id + " Remote endpoint added: " + entry.getValue()); - } - Endpoint newEp = (Endpoint)entry.getValue(); - for (EndpointListener listener : listeners) { - listener.endpointAdded(newEp); - } - } - - public void entryRemoved(Object key, Object value) { - MapEntry entry = (MapEntry)value; - if (!isLocal(entry)) { - logger.info(id + " Remote endpoint removed: " + entry.getValue()); - } - Endpoint oldEp = (Endpoint)entry.getValue(); - for (EndpointListener listener : listeners) { - listener.endpointRemoved(oldEp); - } - } - - public void entryUpdated(Object key, Object oldValue, Object newValue) { - MapEntry oldEntry = (MapEntry)oldValue; - MapEntry newEntry = (MapEntry)newValue; - if (!isLocal(newEntry)) { - logger.info(id + " Remote endpoint updated: " + newEntry.getValue()); - } - Endpoint oldEp = (Endpoint)oldEntry.getValue(); - Endpoint newEp = (Endpoint)newEntry.getValue(); - for (EndpointListener listener : listeners) { - listener.endpointUpdated(oldEp, newEp); - } - } - - public static void main(String[] args) throws Exception { - //create a channel - GroupChannel channel = new GroupChannel(); - McastService mcastService = (McastService)channel.getMembershipService(); - mcastService.setPort(MULTICAST_PORT); - mcastService.setAddress(MULTICAST_ADDRESS); - - InetAddress localhost = InetAddress.getLocalHost(); - - // REVIEW: In my case, there are multiple IP addresses - // One for the WIFI and the other one for VPN. For some reason the VPN one doesn't support - // Multicast - - // You can use "route add 228.0.0.0 mask 252.0.0.0 192.168.1.100" - mcastService.setBind(getBindAddress()); - channel.start(Channel.DEFAULT); - ReplicatedMap map = new ReplicatedMap(null, channel, 50, "01", null); - map.put(UUID.randomUUID().toString(), localhost.getHostAddress()); - for (int i = 0; i < 4; i++) { - Thread.sleep(3000); - System.out.println(localhost + ": " + map.keySet()); - } - for (Object e : map.entrySetFull()) { - Map.Entry en = (Map.Entry)e; - AbstractReplicatedMap.MapEntry entry = (AbstractReplicatedMap.MapEntry)en.getValue(); - System.out.println(entry); - } - map.breakdown(); - channel.stop(Channel.DEFAULT); - } - - private static String getBindAddress() { - try { - Enumeration nis = NetworkInterface.getNetworkInterfaces(); - while (nis.hasMoreElements()) { - NetworkInterface ni = nis.nextElement(); - // The following APIs require JDK 1.6 - /* - if (ni.isLoopback() || !ni.isUp() || !ni.supportsMulticast()) { - continue; - } - */ - Enumeration ips = ni.getInetAddresses(); - if (!ips.hasMoreElements()) { - continue; - } - while (ips.hasMoreElements()) { - InetAddress addr = ips.nextElement(); - if (addr.isLoopbackAddress()) { - continue; - } - return addr.getHostAddress(); - } - } - return InetAddress.getLocalHost().getHostAddress(); - } catch (Exception e) { - logger.log(Level.SEVERE, e.getMessage(), e); - return null; - } - } - -} diff --git a/branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java deleted file mode 100644 index 762407604d..0000000000 --- a/branches/sca-java-2.0-M4/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tuscany.sca.endpoint.tribes; - -import java.io.Serializable; - -import org.apache.catalina.tribes.Channel; -import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.ChannelListener; -import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.MembershipListener; -import org.apache.catalina.tribes.group.RpcCallback; - -/** - * This file is copied from: - * https://svn.apache.org/repos/asf/tomcat/tc6.0.x/tags/TOMCAT_6_0_20/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java - * - */ -public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, ChannelListener, MembershipListener { - private static final long serialVersionUID = -6318779627600581121L; - protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ReplicatedMap.class); - - //------------------------------------------------------------------------------ - // CONSTRUCTORS / DESTRUCTORS - //------------------------------------------------------------------------------ - /** - * Creates a new map - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messags - * @param mapContextName String - unique name for this map, to allow multiple maps per channel - * @param initialCapacity int - the size of this map, see HashMap - * @param loadFactor float - load factor, see HashMap - */ - public ReplicatedMap(MapOwner owner, - Channel channel, - long timeout, - String mapContextName, - int initialCapacity, - float loadFactor, - ClassLoader[] cls) { - super(owner, channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls); - } - - /** - * Creates a new map - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messags - * @param mapContextName String - unique name for this map, to allow multiple maps per channel - * @param initialCapacity int - the size of this map, see HashMap - */ - public ReplicatedMap(MapOwner owner, - Channel channel, - long timeout, - String mapContextName, - int initialCapacity, - ClassLoader[] cls) { - super(owner, channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, - Channel.SEND_OPTIONS_DEFAULT, cls); - } - - /** - * Creates a new map - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messags - * @param mapContextName String - unique name for this map, to allow multiple maps per channel - */ - public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) { - super(owner, channel, timeout, mapContextName, AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, - AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls); - } - - //------------------------------------------------------------------------------ - // METHODS TO OVERRIDE - //------------------------------------------------------------------------------ - protected int getStateMessageType() { - return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY; - } - - /** - * publish info about a map pair (key/value) to other nodes in the cluster - * @param key Object - * @param value Object - * @return Member - the backup node - * @throws ChannelException - */ - protected Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException { - if (!(key instanceof Serializable && value instanceof Serializable)) - return new Member[0]; - //select a backup node - Member[] backup = getMapMembers(); - - if (backup == null || backup.length == 0) - return null; - - // Set the receivers to these members that are not in the backup nodes yet - Member[] members = backup; - if (backupNodes != null) { - members = getMapMembersExcl(backupNodes); - } - - //publish the data out to all nodes - MapMessage msg = - new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value, - null, channel.getLocalMember(false), backup); - - getChannel().send(members, msg, getChannelSendOptions()); - - return backup; - } - -} -- cgit v1.2.3