diff options
Diffstat (limited to 'sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes')
11 files changed, 0 insertions, 2774 deletions
diff --git a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/META-INF/MANIFEST.MF b/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/META-INF/MANIFEST.MF deleted file mode 100644 index 06df55ef38..0000000000 --- a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/META-INF/MANIFEST.MF +++ /dev/null @@ -1,28 +0,0 @@ -Manifest-Version: 1.0
-Private-Package: org.apache.tuscany.sca.xsd.impl;version="2.0.0"
-SCA-Version: 1.1
-Bundle-Name: Apache Tuscany SCA Tomcat Tribes Based EndPoint Registry
-Bundle-Vendor: The Apache Software Foundation
-Bundle-Version: 2.0.0
-Bundle-ManifestVersion: 2
-Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt
-Bundle-Description: Apache Tuscany SCA XSD Model
-Bundle-SymbolicName: org.apache.tuscany.sca.endpoint.tribes
-Bundle-DocURL: http://www.apache.org/
-Bundle-RequiredExecutionEnvironment: J2SE-1.5,JavaSE-1.6 -Import-Package: org.apache.catalina.tribes,
- org.apache.catalina.tribes.group,
- org.apache.catalina.tribes.group.interceptors,
- org.apache.catalina.tribes.io,
- org.apache.catalina.tribes.membership,
- org.apache.catalina.tribes.tipis,
- org.apache.catalina.tribes.transport,
- org.apache.catalina.tribes.util,
- org.apache.juli.logging;resolution:=optional,
- org.apache.tuscany.sca.assembly;version="2.0.0",
- org.apache.tuscany.sca.core;version="2.0.0",
- org.apache.tuscany.sca.core.assembly.impl;scope=internal;version="2.0.0";resolution:=optional,
- org.apache.tuscany.sca.management;version="2.0.0",
- org.apache.tuscany.sca.policy;version="2.0.0",
- org.apache.tuscany.sca.runtime;version="2.0.0"
-Export-Package: org.apache.tuscany.sca.endpoint.tribes;version="2.0.0"
diff --git a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/pom.xml b/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/pom.xml deleted file mode 100644 index 9c547d245e..0000000000 --- a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/pom.xml +++ /dev/null @@ -1,65 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. ---> -<project> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.tuscany.sca</groupId> - <artifactId>tuscany-modules</artifactId> - <version>2.0-Beta3-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - <artifactId>tuscany-endpoint-tribes</artifactId> - <name>Apache Tuscany SCA Tomcat Tribes Based EndPoint Registry</name> - - <dependencies> - <dependency> - <groupId>org.apache.tomcat</groupId> - <artifactId>tribes</artifactId> - <!-- DO NOT upgrade to 6.0.20: https://issues.apache.org/bugzilla/show_bug.cgi?id=47419 --> - <version>6.0.26</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.apache.tuscany.sca</groupId> - <artifactId>tuscany-core-spi</artifactId> - <version>2.0-Beta3-SNAPSHOT</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.apache.tuscany.sca</groupId> - <artifactId>tuscany-core</artifactId> - <version>2.0-Beta3-SNAPSHOT</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.tuscany.sca</groupId> - <artifactId>tuscany-deployment</artifactId> - <version>2.0-Beta3-SNAPSHOT</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.tuscany.sca</groupId> - <artifactId>tuscany-implementation-java-runtime</artifactId> - <version>2.0-Beta3-SNAPSHOT</version> - <scope>test</scope> - </dependency> - </dependencies> - -</project> diff --git a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java deleted file mode 100644 index fe683af025..0000000000 --- a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java +++ /dev/null @@ -1,1564 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tuscany.sca.endpoint.tribes; - -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.io.Serializable; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.catalina.tribes.Channel; -import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.ChannelListener; -import org.apache.catalina.tribes.Heartbeat; -import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.MembershipListener; -import org.apache.catalina.tribes.group.Response; -import org.apache.catalina.tribes.group.RpcCallback; -import org.apache.catalina.tribes.group.RpcChannel; -import org.apache.catalina.tribes.io.XByteBuffer; -import org.apache.catalina.tribes.membership.MemberImpl; -import org.apache.catalina.tribes.tipis.ReplicatedMapEntry; -import org.apache.catalina.tribes.util.Arrays; -import org.apache.juli.logging.Log; -import org.apache.juli.logging.LogFactory; - -/** - * This file is copied from: - * https://svn.apache.org/repos/asf/tomcat/tc6.0.x/tags/TOMCAT_6_0_20/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java - * - * We need to intercept the put/remove calls on the base Map to fire events - */ -public abstract class AbstractReplicatedMap extends MapStore implements RpcCallback, ChannelListener, - MembershipListener, Heartbeat { - private static final long serialVersionUID = -2703358020113899074L; - - protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class); - - /** - * The default initial capacity - MUST be a power of two. - */ - public static final int DEFAULT_INITIAL_CAPACITY = 16; - - /** - * The load factor used when none specified in constructor. - **/ - public static final float DEFAULT_LOAD_FACTOR = 0.75f; - - /** - * Used to identify the map - */ - final String chset = "ISO-8859-1"; - - //------------------------------------------------------------------------------ - // INSTANCE VARIABLES - //------------------------------------------------------------------------------ - protected abstract int getStateMessageType(); - - /** - * Timeout for RPC messages, how long we will wait for a reply - */ - protected transient long rpcTimeout = 5000; - /** - * Reference to the channel for sending messages - */ - protected transient Channel channel; - /** - * The RpcChannel to send RPC messages through - */ - protected transient RpcChannel rpcChannel; - /** - * The Map context name makes this map unique, this - * allows us to have more than one map shared - * through one channel - */ - protected transient byte[] mapContextName; - /** - * Has the state been transferred - */ - protected transient boolean stateTransferred = false; - /** - * Simple lock object for transfers - */ - protected transient Object stateMutex = new Object(); - /** - * A list of members in our map - */ - protected transient Map<Member, Long> mapMembers = new HashMap<Member, Long>(); - /** - * Our default send options - */ - protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT; - /** - * The owner of this map, ala a SessionManager for example - */ - protected transient MapOwner mapOwner; - /** - * External class loaders if serialization and deserialization is to be performed successfully. - */ - protected transient ClassLoader[] externalLoaders; - - /** - * The node we are currently backing up data to, this index will rotate - * on a round robin basis - */ - protected transient int currentNode = 0; - - /** - * Since the map keeps internal membership - * this is the timeout for a ping message to be responded to - * If a remote map doesn't respond within this timeframe, - * its considered dead. - */ - protected transient long accessTimeout = 5000; - - /** - * Readable string of the mapContextName value - */ - protected transient String mapname = ""; - - //------------------------------------------------------------------------------ - // map owner interface - //------------------------------------------------------------------------------ - - public static interface MapOwner { - public void objectMadePrimay(Object key, Object value); - } - - //------------------------------------------------------------------------------ - // CONSTRUCTORS - //------------------------------------------------------------------------------ - - /** - * Creates a new map - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messags - * @param mapContextName String - unique name for this map, to allow multiple maps per channel - * @param initialCapacity int - the size of this map, see HashMap - * @param loadFactor float - load factor, see HashMap - * @param cls - a list of classloaders to be used for deserialization of objects. - */ - public AbstractReplicatedMap(MapOwner owner, - Channel channel, - long timeout, - String mapContextName, - int initialCapacity, - float loadFactor, - int channelSendOptions, - ClassLoader[] cls) { - super(initialCapacity, loadFactor, 15); - init(owner, channel, mapContextName, timeout, channelSendOptions, cls); - - } - - /** - * Helper methods, wraps a single member in an array - * @param m Member - * @return Member[] - */ - protected Member[] wrap(Member m) { - if (m == null) - return new Member[0]; - else - return new Member[] {m}; - } - - /** - * Initializes the map by creating the RPC channel, registering itself as a channel listener - * This method is also responsible for initiating the state transfer - * @param owner Object - * @param channel Channel - * @param mapContextName String - * @param timeout long - * @param channelSendOptions int - * @param cls ClassLoader[] - */ - protected void init(MapOwner owner, - Channel channel, - String mapContextName, - long timeout, - int channelSendOptions, - ClassLoader[] cls) { - log.info("Initializing AbstractReplicatedMap with context name:" + mapContextName); - this.mapOwner = owner; - this.externalLoaders = cls; - this.channelSendOptions = channelSendOptions; - this.channel = channel; - this.rpcTimeout = timeout; - - try { - this.mapname = mapContextName; - //unique context is more efficient if it is stored as bytes - this.mapContextName = mapContextName.getBytes(chset); - } catch (UnsupportedEncodingException x) { - log.warn("Unable to encode mapContextName[" + mapContextName - + "] using getBytes(" - + chset - + ") using default getBytes()", x); - this.mapContextName = mapContextName.getBytes(); - } - if (log.isTraceEnabled()) - log.trace("Created Lazy Map with name:" + mapContextName - + ", bytes:" - + Arrays.toString(this.mapContextName)); - - //create an rpc channel and add the map as a listener - this.rpcChannel = new RpcChannel(this.mapContextName, channel, this); - //add this map as a message listener - this.channel.addChannelListener(this); - //listen for membership notifications - this.channel.addMembershipListener(this); - - try { - //broadcast our map, this just notifies other members of our existence - broadcast(MapMessage.MSG_INIT, true); - //transfer state from another map - transferState(); - //state is transferred, we are ready for messaging - broadcast(MapMessage.MSG_START, true); - } catch (ChannelException x) { - log.warn("Unable to send map start message."); - throw new RuntimeException("Unable to start replicated map.", x); - } - } - - /** - * Sends a ping out to all the members in the cluster, not just map members - * that this map is alive. - * @param timeout long - * @throws ChannelException - */ - protected void ping(long timeout) throws ChannelException { - //send out a map membership message, only wait for the first reply - MapMessage msg = - new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel - .getLocalMember(false), null); - if (channel.getMembers().length > 0) { - //send a ping, wait for all nodes to reply - Response[] resp = - rpcChannel.send(channel.getMembers(), - msg, - RpcChannel.ALL_REPLY, - (channelSendOptions), - (int)accessTimeout); - for (int i = 0; i < resp.length; i++) { - memberAlive(resp[i].getSource()); - } //for - } - //update our map of members, expire some if we didn't receive a ping back - synchronized (mapMembers) { - Iterator it = mapMembers.entrySet().iterator(); - long now = System.currentTimeMillis(); - while (it.hasNext()) { - Map.Entry entry = (Map.Entry)it.next(); - long access = ((Long)entry.getValue()).longValue(); - if ((now - access) > timeout) { - it.remove(); - memberDisappeared((Member)entry.getKey()); - } - } - }//synch - } - - /** - * We have received a member alive notification - * @param member Member - */ - protected void memberAlive(Member member) { - synchronized (mapMembers) { - if (!mapMembers.containsKey(member)) { - mapMemberAdded(member); - } //end if - mapMembers.put(member, new Long(System.currentTimeMillis())); - } - } - - /** - * Helper method to broadcast a message to all members in a channel - * @param msgtype int - * @param rpc boolean - * @throws ChannelException - */ - protected void broadcast(int msgtype, boolean rpc) throws ChannelException { - //send out a map membership message, only wait for the first reply - MapMessage msg = - new MapMessage(this.mapContextName, msgtype, false, null, null, null, channel.getLocalMember(false), null); - if (rpc) { - Response[] resp = - rpcChannel.send(channel.getMembers(), msg, RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout); - for (int i = 0; i < resp.length; i++) { - mapMemberAdded(resp[i].getSource()); - messageReceived(resp[i].getMessage(), resp[i].getSource()); - } - } else { - channel.send(channel.getMembers(), msg, channelSendOptions); - } - } - - public void breakdown() { - finalize(); - } - - public void finalize() { - try { - broadcast(MapMessage.MSG_STOP, false); - } catch (Exception ignore) { - } - //cleanup - if (this.rpcChannel != null) { - this.rpcChannel.breakdown(); - } - if (this.channel != null) { - this.channel.removeChannelListener(this); - this.channel.removeMembershipListener(this); - } - this.rpcChannel = null; - this.channel = null; - this.mapMembers.clear(); - super.clear(); - this.stateTransferred = false; - this.externalLoaders = null; - } - - public int hashCode() { - return Arrays.hashCode(this.mapContextName); - } - - public boolean equals(Object o) { - if (o == null) - return false; - if (!(o instanceof AbstractReplicatedMap)) - return false; - if (!(o.getClass().equals(this.getClass()))) - return false; - AbstractReplicatedMap other = (AbstractReplicatedMap)o; - return Arrays.equals(mapContextName, other.mapContextName); - } - - //------------------------------------------------------------------------------ - // GROUP COM INTERFACES - //------------------------------------------------------------------------------ - public Member[] getMapMembers(Map<Member, Long> members) { - synchronized (members) { - Member[] result = new Member[members.size()]; - members.keySet().toArray(result); - return result; - } - } - - public Member[] getMapMembers() { - return getMapMembers(this.mapMembers); - } - - public Member[] getMapMembersExcl(Member[] exclude) { - synchronized (mapMembers) { - Map<Member, Long> list = (Map<Member, Long>)((HashMap<Member, Long>)mapMembers).clone(); - for (int i = 0; i < exclude.length; i++) - list.remove(exclude[i]); - return getMapMembers(list); - } - } - - /** - * Replicates any changes to the object since the last time - * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br> - * @param complete - if set to true, the object is replicated to its backup - * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will - * be replicated - */ - public void replicate(Object key, boolean complete) { - if (log.isTraceEnabled()) - log.trace("Replicate invoked on key:" + key); - MapEntry entry = (MapEntry)super.get(key); - if (entry == null) - return; - if (!entry.isSerializable()) - return; - if (entry != null && entry.isPrimary() && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) { - Object value = entry.getValue(); - //check to see if we need to replicate this object isDirty()||complete - boolean repl = complete || ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDirty()); - - if (!repl) { - if (log.isTraceEnabled()) - log.trace("Not replicating:" + key + ", no change made"); - - return; - } - //check to see if the message is diffable - boolean diff = ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable()); - MapMessage msg = null; - if (diff) { - ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue(); - try { - rentry.lock(); - //construct a diff message - msg = - new MapMessage(mapContextName, MapMessage.MSG_BACKUP, true, (Serializable)entry.getKey(), null, - rentry.getDiff(), entry.getPrimary(), entry.getBackupNodes()); - } catch (IOException x) { - log.error("Unable to diff object. Will replicate the entire object instead.", x); - } finally { - rentry.unlock(); - } - - } - if (msg == null) { - //construct a complete - msg = - new MapMessage(mapContextName, MapMessage.MSG_BACKUP, false, (Serializable)entry.getKey(), - (Serializable)entry.getValue(), null, entry.getPrimary(), entry.getBackupNodes()); - - } - try { - if (channel != null && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) { - channel.send(entry.getBackupNodes(), msg, channelSendOptions); - } - } catch (ChannelException x) { - log.error("Unable to replicate data.", x); - } - } //end if - - } - - /** - * This can be invoked by a periodic thread to replicate out any changes. - * For maps that don't store objects that implement ReplicatedMapEntry, this - * method should be used infrequently to avoid large amounts of data transfer - * @param complete boolean - */ - public void replicate(boolean complete) { - Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry<Object, Object> e = i.next(); - replicate(e.getKey(), complete); - } //while - - } - - public void transferState() { - try { - Member[] members = getMapMembers(); - Member backup = members.length > 0 ? (Member)members[0] : null; - if (backup != null) { - MapMessage msg = - new MapMessage(mapContextName, getStateMessageType(), false, null, null, null, null, null); - Response[] resp = - rpcChannel.send(new Member[] {backup}, msg, RpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout); - if (resp.length > 0) { - synchronized (stateMutex) { - msg = (MapMessage)resp[0].getMessage(); - msg.deserialize(getExternalLoaders()); - List list = (List)msg.getValue(); - for (int i = 0; i < list.size(); i++) { - messageReceived((Serializable)list.get(i), resp[0].getSource()); - } //for - } - } else { - log.warn("Transfer state, 0 replies, probably a timeout."); - } - } - } catch (ChannelException x) { - log.error("Unable to transfer LazyReplicatedMap state.", x); - } catch (IOException x) { - log.error("Unable to transfer LazyReplicatedMap state.", x); - } catch (ClassNotFoundException x) { - log.error("Unable to transfer LazyReplicatedMap state.", x); - } - stateTransferred = true; - } - - /** - * @todo implement state transfer - * @param msg Serializable - * @return Serializable - null if no reply should be sent - */ - public Serializable replyRequest(Serializable msg, final Member sender) { - if (!(msg instanceof MapMessage)) - return null; - MapMessage mapmsg = (MapMessage)msg; - - //map init request - if (mapmsg.getMsgType() == MapMessage.MSG_INIT) { - mapmsg.setPrimary(channel.getLocalMember(false)); - return mapmsg; - } - - //map start request - if (mapmsg.getMsgType() == MapMessage.MSG_START) { - mapmsg.setPrimary(channel.getLocalMember(false)); - mapMemberAdded(sender); - return mapmsg; - } - - //backup request - if (mapmsg.getMsgType() == MapMessage.MSG_RETRIEVE_BACKUP) { - MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); - if (entry == null || (!entry.isSerializable())) - return null; - mapmsg.setValue((Serializable)entry.getValue()); - return mapmsg; - } - - //state transfer request - if (mapmsg.getMsgType() == MapMessage.MSG_STATE || mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY) { - synchronized (stateMutex) { //make sure we dont do two things at the same time - ArrayList<MapMessage> list = new ArrayList<MapMessage>(); - Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry<Object, Object> e = i.next(); - MapEntry entry = (MapEntry)super.get(e.getKey()); - if (entry != null && entry.isSerializable()) { - boolean copy = (mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY); - MapMessage me = - new MapMessage(mapContextName, copy ? MapMessage.MSG_COPY : MapMessage.MSG_PROXY, false, - (Serializable)entry.getKey(), copy ? (Serializable)entry.getValue() : null, - null, entry.getPrimary(), entry.getBackupNodes()); - list.add(me); - } - } - mapmsg.setValue(list); - return mapmsg; - - } //synchronized - } - - return null; - - } - - /** - * If the reply has already been sent to the requesting thread, - * the rpc callback can handle any data that comes in after the fact. - * @param msg Serializable - * @param sender Member - */ - public void leftOver(Serializable msg, Member sender) { - //left over membership messages - if (!(msg instanceof MapMessage)) - return; - - MapMessage mapmsg = (MapMessage)msg; - try { - mapmsg.deserialize(getExternalLoaders()); - if (mapmsg.getMsgType() == MapMessage.MSG_START) { - mapMemberAdded(mapmsg.getPrimary()); - } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) { - memberAlive(mapmsg.getPrimary()); - } - } catch (IOException x) { - log.error("Unable to deserialize MapMessage.", x); - } catch (ClassNotFoundException x) { - log.error("Unable to deserialize MapMessage.", x); - } - } - - public void messageReceived(Serializable msg, Member sender) { - if (!(msg instanceof MapMessage)) - return; - - MapMessage mapmsg = (MapMessage)msg; - if (log.isTraceEnabled()) { - log.trace("Map[" + mapname + "] received message:" + mapmsg); - } - - try { - mapmsg.deserialize(getExternalLoaders()); - } catch (IOException x) { - log.error("Unable to deserialize MapMessage.", x); - return; - } catch (ClassNotFoundException x) { - log.error("Unable to deserialize MapMessage.", x); - return; - } - if (log.isTraceEnabled()) - log.trace("Map message received from:" + sender.getName() + " msg:" + mapmsg); - if (mapmsg.getMsgType() == MapMessage.MSG_START) { - mapMemberAdded(mapmsg.getPrimary()); - } - - if (mapmsg.getMsgType() == MapMessage.MSG_STOP) { - memberDisappeared(mapmsg.getPrimary()); - } - - if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) { - MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); - if (entry == null) { - entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue()); - entry.setBackup(false); - entry.setProxy(true); - entry.setBackupNodes(mapmsg.getBackupNodes()); - entry.setPrimary(mapmsg.getPrimary()); - super.put(entry.getKey(), entry); - } else { - entry.setProxy(true); - entry.setBackup(false); - entry.setBackupNodes(mapmsg.getBackupNodes()); - entry.setPrimary(mapmsg.getPrimary()); - } - } - - if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) { - super.remove(mapmsg.getKey()); - } - - if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP || mapmsg.getMsgType() == MapMessage.MSG_COPY) { - MapEntry entry = (MapEntry)super.get(mapmsg.getKey()); - if (entry == null) { - entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue()); - entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); - entry.setProxy(false); - entry.setBackupNodes(mapmsg.getBackupNodes()); - entry.setPrimary(mapmsg.getPrimary()); - if (mapmsg.getValue() != null && mapmsg.getValue() instanceof ReplicatedMapEntry) { - ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner()); - } - } else { - entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP); - entry.setProxy(false); - entry.setBackupNodes(mapmsg.getBackupNodes()); - entry.setPrimary(mapmsg.getPrimary()); - if (entry.getValue() instanceof ReplicatedMapEntry) { - ReplicatedMapEntry diff = (ReplicatedMapEntry)entry.getValue(); - if (mapmsg.isDiff()) { - try { - diff.lock(); - diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length); - } catch (Exception x) { - log.error("Unable to apply diff to key:" + entry.getKey(), x); - } finally { - diff.unlock(); - } - } else { - if (mapmsg.getValue() != null) - entry.setValue(mapmsg.getValue()); - ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner()); - } //end if - } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) { - ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue(); - re.setOwner(getMapOwner()); - entry.setValue(re); - } else { - if (mapmsg.getValue() != null) - entry.setValue(mapmsg.getValue()); - } //end if - } //end if - super.put(entry.getKey(), entry); - } //end if - } - - public boolean accept(Serializable msg, Member sender) { - boolean result = false; - if (msg instanceof MapMessage) { - if (log.isTraceEnabled()) - log.trace("Map[" + mapname + "] accepting...." + msg); - result = Arrays.equals(mapContextName, ((MapMessage)msg).getMapId()); - if (log.isTraceEnabled()) - log.trace("Msg[" + mapname + "] accepted[" + result + "]...." + msg); - } - return result; - } - - public void mapMemberAdded(Member member) { - Member self = getChannel().getLocalMember(false); - if (member.equals(self)) - return; - boolean memberAdded = false; - //select a backup node if we don't have one - synchronized (mapMembers) { - if (!mapMembers.containsKey(member)) { - mapMembers.put(member, new Long(System.currentTimeMillis())); - memberAdded = true; - } - } - if (memberAdded) { - synchronized (stateMutex) { - Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry<Object, Object> e = i.next(); - MapEntry entry = (MapEntry)super.get(e.getKey()); - if (entry == null) - continue; - // if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) { - // [rfeng] Change the behavior to replicate to all nodes - if (entry.isPrimary() && self.equals(entry.getPrimary())) { - try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); - entry.setBackupNodes(backup); - entry.setPrimary(self); - } catch (ChannelException x) { - log.error("Unable to select backup node.", x); - } //catch - } //end if - } //while - } //synchronized - }//end if - } - - public boolean inSet(Member m, Member[] set) { - if (set == null) - return false; - boolean result = false; - for (int i = 0; i < set.length && (!result); i++) - if (m.equals(set[i])) - result = true; - return result; - } - - public Member[] excludeFromSet(Member[] mbrs, Member[] set) { - List<Member> result = new ArrayList<Member>(); - for (int i = 0; i < set.length; i++) { - boolean include = true; - for (int j = 0; j < mbrs.length; j++) - if (mbrs[j].equals(set[i])) - include = false; - if (include) - result.add(set[i]); - } - return (Member[])result.toArray(new Member[result.size()]); - } - - public void memberAdded(Member member) { - //do nothing - } - - public void memberDisappeared(Member member) { - boolean removed = false; - synchronized (mapMembers) { - removed = (mapMembers.remove(member) != null); - if (!removed) { - if (log.isDebugEnabled()) - log.debug("Member[" + member + "] disappeared, but was not present in the map."); - return; //the member was not part of our map. - } - } - - Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry<Object, Object> e = i.next(); - MapEntry entry = (MapEntry)super.get(e.getKey()); - if (entry == null) - continue; - if (entry.isPrimary() && inSet(member, entry.getBackupNodes())) { - if (log.isDebugEnabled()) - log.debug("[1] Primary choosing a new backup"); - try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); - entry.setBackupNodes(backup); - entry.setPrimary(channel.getLocalMember(false)); - } catch (ChannelException x) { - log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); - } - } else if (member.equals(entry.getPrimary())) { - if (log.isDebugEnabled()) - log.debug("[2] Primary disappeared"); - entry.setPrimary(null); - } //end if - - if (entry.isProxy() && entry.getPrimary() == null - && entry.getBackupNodes() != null - && entry.getBackupNodes().length == 1 - && entry.getBackupNodes()[0].equals(member)) { - //remove proxies that have no backup nor primaries - if (log.isDebugEnabled()) - log.debug("[3] Removing orphaned proxy"); - i.remove(); - } else if (entry.getPrimary() == null && entry.isBackup() - && entry.getBackupNodes() != null - && entry.getBackupNodes().length == 1 - && entry.getBackupNodes()[0].equals(channel.getLocalMember(false))) { - try { - if (log.isDebugEnabled()) - log.debug("[4] Backup becoming primary"); - entry.setPrimary(channel.getLocalMember(false)); - entry.setBackup(false); - entry.setProxy(false); - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); - entry.setBackupNodes(backup); - if (mapOwner != null) - mapOwner.objectMadePrimay(entry.getKey(), entry.getValue()); - - } catch (ChannelException x) { - log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x); - } - } - - } //while - } - - public int getNextBackupIndex() { - int size = mapMembers.size(); - if (mapMembers.size() == 0) - return -1; - int node = currentNode++; - if (node >= size) { - node = 0; - currentNode = 0; - } - return node; - } - - public Member getNextBackupNode() { - Member[] members = getMapMembers(); - int node = getNextBackupIndex(); - if (members.length == 0 || node == -1) - return null; - if (node >= members.length) - node = 0; - return members[node]; - } - - protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException; - - public void heartbeat() { - try { - ping(accessTimeout); - } catch (Exception x) { - log.error("Unable to send AbstractReplicatedMap.ping message", x); - } - } - - //------------------------------------------------------------------------------ - // METHODS TO OVERRIDE - //------------------------------------------------------------------------------ - - /** - * Removes an object from this map, it will also remove it from - * - * @param key Object - * @return Object - */ - public Object remove(Object key) { - return remove(key, true); - } - - public Object remove(Object key, boolean notify) { - MapEntry entry = (MapEntry)super.remove(key); - - try { - if (getMapMembers().length > 0 && notify) { - MapMessage msg = - new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable)key, null, null, - null, null); - getChannel().send(getMapMembers(), msg, getChannelSendOptions()); - } - } catch (ChannelException x) { - log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation", x); - } - return entry != null ? entry.getValue() : null; - } - - public MapEntry getInternal(Object key) { - return (MapEntry)super.get(key); - } - - public Object get(Object key) { - MapEntry entry = (MapEntry)super.get(key); - if (log.isTraceEnabled()) - log.trace("Requesting id:" + key + " entry:" + entry); - if (entry == null) - return null; - if (!entry.isPrimary()) { - //if the message is not primary, we need to retrieve the latest value - try { - Member[] backup = null; - MapMessage msg = null; - if (!entry.isBackup()) { - //make sure we don't retrieve from ourselves - msg = - new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable)key, - null, null, null, null); - Response[] resp = - getRpcChannel().send(entry.getBackupNodes(), - msg, - RpcChannel.FIRST_REPLY, - Channel.SEND_OPTIONS_DEFAULT, - getRpcTimeout()); - if (resp == null || resp.length == 0) { - //no responses - log.warn("Unable to retrieve remote object for key:" + key); - return null; - } - msg = (MapMessage)resp[0].getMessage(); - msg.deserialize(getExternalLoaders()); - backup = entry.getBackupNodes(); - if (entry.getValue() instanceof ReplicatedMapEntry) { - ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); - val.setOwner(getMapOwner()); - } - if (msg.getValue() != null) - entry.setValue(msg.getValue()); - } - if (entry.isBackup()) { - //select a new backup node - backup = publishEntryInfo(key, entry.getValue()); - } else if (entry.isProxy()) { - //invalidate the previous primary - msg = - new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable)key, null, null, - channel.getLocalMember(false), backup); - Member[] dest = getMapMembersExcl(backup); - if (dest != null && dest.length > 0) { - getChannel().send(dest, msg, getChannelSendOptions()); - } - } - entry.setPrimary(channel.getLocalMember(false)); - entry.setBackupNodes(backup); - entry.setBackup(false); - entry.setProxy(false); - - } catch (Exception x) { - log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x); - return null; - } - } - if (log.isTraceEnabled()) - log.trace("Requesting id:" + key + " result:" + entry.getValue()); - if (entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry) { - ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue(); - //hack, somehow this is not being set above - val.setOwner(getMapOwner()); - - } - return entry.getValue(); - } - - protected void printMap(String header) { - try { - System.out.println("\nDEBUG MAP:" + header); - System.out.println("Map[" + new String(mapContextName, chset) + ", Map Size:" + super.size()); - Member[] mbrs = getMapMembers(); - for (int i = 0; i < mbrs.length; i++) { - System.out.println("Mbr[" + (i + 1) + "=" + mbrs[i].getName()); - } - Iterator i = super.entrySet().iterator(); - int cnt = 0; - - while (i.hasNext()) { - Map.Entry e = (Map.Entry)i.next(); - System.out.println((++cnt) + ". " + super.get(e.getKey())); - } - System.out.println("EndMap]\n\n"); - } catch (Exception ignore) { - ignore.printStackTrace(); - } - } - - /** - * Returns true if the key has an entry in the map. - * The entry can be a proxy or a backup entry, invoking <code>get(key)</code> - * will make this entry primary for the group - * @param key Object - * @return boolean - */ - public boolean containsKey(Object key) { - return super.containsKey(key); - } - - public Object put(Object key, Object value) { - return put(key, value, true); - } - - public Object put(Object key, Object value, boolean notify) { - MapEntry entry = new MapEntry(key, value); - entry.setBackup(false); - entry.setProxy(false); - entry.setPrimary(channel.getLocalMember(false)); - - Object old = null; - - //make sure that any old values get removed - if (containsKey(key)) - old = remove(key); - try { - if (notify) { - Member[] backup = publishEntryInfo(key, value); - entry.setBackupNodes(backup); - } - } catch (ChannelException x) { - log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x); - } - super.put(key, entry); - return old; - } - - /** - * Copies all values from one map to this instance - * @param m Map - */ - public void putAll(Map m) { - Iterator i = m.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry entry = (Map.Entry)i.next(); - put(entry.getKey(), entry.getValue()); - } - } - - public void clear() { - clear(true); - } - - public void clear(boolean notify) { - if (notify) { - //only delete active keys - Iterator<Object> keys = keySet().iterator(); - while (keys.hasNext()) - remove(keys.next()); - } else { - super.clear(); - } - } - - public boolean containsValue(Object value) { - if (value == null) { - return super.containsValue(value); - } else { - Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry<Object, Object> e = i.next(); - MapEntry entry = (MapEntry)super.get(e.getKey()); - if (entry != null && entry.isPrimary() && value.equals(entry.getValue())) - return true; - }//while - return false; - }//end if - } - - public Object clone() { - throw new UnsupportedOperationException("This operation is not valid on a replicated map"); - } - - /** - * Returns the entire contents of the map - * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information - * about the object. - * @return Set - */ - public Set<Map.Entry<Object, Object>> entrySetFull() { - return super.entrySet(); - } - - public Set<Object> keySetFull() { - return super.keySet(); - } - - public int sizeFull() { - return super.size(); - } - - public Set<Map.Entry<Object, Object>> entrySet() { - Set<Map.Entry<Object, Object>> set = new LinkedHashSet<Map.Entry<Object, Object>>(super.size()); - Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry<Object, Object> e = i.next(); - Object key = e.getKey(); - MapEntry entry = (MapEntry)super.get(key); - if (entry != null && entry.isPrimary()) { - set.add(new MapEntry(key, entry.getValue())); - } - } - return Collections.unmodifiableSet(set); - } - - public Set<Object> keySet() { - //todo implement - //should only return keys where this is active. - LinkedHashSet<Object> set = new LinkedHashSet<Object>(super.size()); - Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry<Object, Object> e = i.next(); - Object key = e.getKey(); - MapEntry entry = (MapEntry)super.get(key); - if (entry != null && entry.isPrimary()) - set.add(key); - } - return Collections.unmodifiableSet(set); - - } - - public int size() { - //todo, implement a counter variable instead - //only count active members in this node - int counter = 0; - Iterator<Map.Entry<Object, Object>> it = super.entrySet().iterator(); - while (it != null && it.hasNext()) { - Map.Entry<Object, Object> e = it.next(); - if (e != null) { - MapEntry entry = (MapEntry)super.get(e.getKey()); - if (entry != null && entry.isPrimary() && entry.getValue() != null) - counter++; - } - } - return counter; - } - - protected boolean removeEldestEntry(Map.Entry<Object, Object> eldest) { - return false; - } - - public boolean isEmpty() { - return size() == 0; - } - - public Collection<Object> values() { - ArrayList<Object> values = new ArrayList<Object>(); - Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry<Object, Object> e = i.next(); - MapEntry entry = (MapEntry)super.get(e.getKey()); - if (entry != null && entry.isPrimary() && entry.getValue() != null) - values.add(entry.getValue()); - } - return Collections.unmodifiableCollection(values); - } - - //------------------------------------------------------------------------------ - // Map Entry class - //------------------------------------------------------------------------------ - public static class MapEntry implements Map.Entry<Object, Object> { - private boolean backup; - private boolean proxy; - private Member[] backupNodes; - private Member primary; - private Object key; - private Object value; - - public MapEntry(Object key, Object value) { - setKey(key); - setValue(value); - - } - - public boolean isKeySerializable() { - return (key == null) || (key instanceof Serializable); - } - - public boolean isValueSerializable() { - return (value == null) || (value instanceof Serializable); - } - - public boolean isSerializable() { - return isKeySerializable() && isValueSerializable(); - } - - public boolean isBackup() { - return backup; - } - - public void setBackup(boolean backup) { - this.backup = backup; - } - - public boolean isProxy() { - return proxy; - } - - public boolean isPrimary() { - return ((!proxy) && (!backup)); - } - - public void setProxy(boolean proxy) { - this.proxy = proxy; - } - - public boolean isDiffable() { - return (value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable(); - } - - public void setBackupNodes(Member[] nodes) { - this.backupNodes = nodes; - } - - public Member[] getBackupNodes() { - return backupNodes; - } - - public void setPrimary(Member m) { - primary = m; - } - - public Member getPrimary() { - return primary; - } - - public Object getValue() { - return value; - } - - public Object setValue(Object value) { - Object old = this.value; - this.value = value; - return old; - } - - public Object getKey() { - return key; - } - - public Object setKey(Object key) { - Object old = this.key; - this.key = key; - return old; - } - - public int hashCode() { - return key.hashCode(); - } - - public boolean equals(Object o) { - return key.equals(o); - } - - /** - * apply a diff, or an entire object - * @param data byte[] - * @param offset int - * @param length int - * @param diff boolean - * @throws IOException - * @throws ClassNotFoundException - */ - public void apply(byte[] data, int offset, int length, boolean diff) throws IOException, ClassNotFoundException { - if (isDiffable() && diff) { - ReplicatedMapEntry rentry = (ReplicatedMapEntry)value; - try { - rentry.lock(); - rentry.applyDiff(data, offset, length); - } finally { - rentry.unlock(); - } - } else if (length == 0) { - value = null; - proxy = true; - } else { - value = XByteBuffer.deserialize(data, offset, length); - } - } - - public String toString() { - StringBuffer buf = new StringBuffer("MapEntry[key:"); - buf.append(getKey()).append("; "); - buf.append("value:").append(getValue()).append("; "); - buf.append("primary:").append(isPrimary()).append("; "); - buf.append("backup:").append(isBackup()).append("; "); - buf.append("proxy:").append(isProxy()).append(";]"); - return buf.toString(); - } - - } - - //------------------------------------------------------------------------------ - // map message to send to and from other maps - //------------------------------------------------------------------------------ - - public static class MapMessage implements Serializable { - private static final long serialVersionUID = -7847288807489375686L; - public static final int MSG_BACKUP = 1; - public static final int MSG_RETRIEVE_BACKUP = 2; - public static final int MSG_PROXY = 3; - public static final int MSG_REMOVE = 4; - public static final int MSG_STATE = 5; - public static final int MSG_START = 6; - public static final int MSG_STOP = 7; - public static final int MSG_INIT = 8; - public static final int MSG_COPY = 9; - public static final int MSG_STATE_COPY = 10; - - private byte[] mapId; - private int msgtype; - private boolean diff; - private transient Serializable key; - private transient Serializable value; - private byte[] valuedata; - private byte[] keydata; - private byte[] diffvalue; - private Member[] nodes; - private Member primary; - - public String toString() { - StringBuffer buf = new StringBuffer("MapMessage[context="); - buf.append(new String(mapId)); - buf.append("; type="); - buf.append(getTypeDesc()); - buf.append("; key="); - buf.append(key); - buf.append("; value="); - buf.append(value); - return buf.toString(); - } - - public String getTypeDesc() { - switch (msgtype) { - case MSG_BACKUP: - return "MSG_BACKUP"; - case MSG_RETRIEVE_BACKUP: - return "MSG_RETRIEVE_BACKUP"; - case MSG_PROXY: - return "MSG_PROXY"; - case MSG_REMOVE: - return "MSG_REMOVE"; - case MSG_STATE: - return "MSG_STATE"; - case MSG_START: - return "MSG_START"; - case MSG_STOP: - return "MSG_STOP"; - case MSG_INIT: - return "MSG_INIT"; - case MSG_STATE_COPY: - return "MSG_STATE_COPY"; - case MSG_COPY: - return "MSG_COPY"; - default: - return "UNKNOWN"; - } - } - - public MapMessage() { - } - - public MapMessage(byte[] mapId, - int msgtype, - boolean diff, - Serializable key, - Serializable value, - byte[] diffvalue, - Member primary, - Member[] nodes) { - this.mapId = mapId; - this.msgtype = msgtype; - this.diff = diff; - this.key = key; - this.value = value; - this.diffvalue = diffvalue; - this.nodes = nodes; - this.primary = primary; - setValue(value); - setKey(key); - } - - public void deserialize(ClassLoader[] cls) throws IOException, ClassNotFoundException { - key(cls); - value(cls); - } - - public int getMsgType() { - return msgtype; - } - - public boolean isDiff() { - return diff; - } - - public Serializable getKey() { - try { - return key(null); - } catch (Exception x) { - log.error("Deserialization error of the MapMessage.key", x); - return null; - } - } - - public Serializable key(ClassLoader[] cls) throws IOException, ClassNotFoundException { - if (key != null) - return key; - if (keydata == null || keydata.length == 0) - return null; - key = XByteBuffer.deserialize(keydata, 0, keydata.length, cls); - keydata = null; - return key; - } - - public byte[] getKeyData() { - return keydata; - } - - public Serializable getValue() { - try { - return value(null); - } catch (Exception x) { - log.error("Deserialization error of the MapMessage.value", x); - return null; - } - } - - public Serializable value(ClassLoader[] cls) throws IOException, ClassNotFoundException { - if (value != null) - return value; - if (valuedata == null || valuedata.length == 0) - return null; - value = XByteBuffer.deserialize(valuedata, 0, valuedata.length, cls); - valuedata = null; - ; - return value; - } - - public byte[] getValueData() { - return valuedata; - } - - public byte[] getDiffValue() { - return diffvalue; - } - - public Member[] getBackupNodes() { - return nodes; - } - - private void setBackUpNodes(Member[] nodes) { - this.nodes = nodes; - } - - public Member getPrimary() { - return primary; - } - - private void setPrimary(Member m) { - primary = m; - } - - public byte[] getMapId() { - return mapId; - } - - public void setValue(Serializable value) { - try { - if (value != null) - valuedata = XByteBuffer.serialize(value); - this.value = value; - } catch (IOException x) { - throw new RuntimeException(x); - } - } - - public void setKey(Serializable key) { - try { - if (key != null) - keydata = XByteBuffer.serialize(key); - this.key = key; - } catch (IOException x) { - throw new RuntimeException(x); - } - } - - protected Member[] readMembers(ObjectInput in) throws IOException, ClassNotFoundException { - int nodecount = in.readInt(); - Member[] members = new Member[nodecount]; - for (int i = 0; i < members.length; i++) { - byte[] d = new byte[in.readInt()]; - in.read(d); - if (d.length > 0) - members[i] = MemberImpl.getMember(d); - } - return members; - } - - protected void writeMembers(ObjectOutput out, Member[] members) throws IOException { - if (members == null) - members = new Member[0]; - out.writeInt(members.length); - for (int i = 0; i < members.length; i++) { - if (members[i] != null) { - byte[] d = members[i] != null ? ((MemberImpl)members[i]).getData(false) : new byte[0]; - out.writeInt(d.length); - out.write(d); - } - } - } - - /** - * shallow clone - * @return Object - */ - public Object clone() { - MapMessage msg = - new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.primary, - this.nodes); - msg.keydata = this.keydata; - msg.valuedata = this.valuedata; - return msg; - } - } //MapMessage - - public Channel getChannel() { - return channel; - } - - public byte[] getMapContextName() { - return mapContextName; - } - - public RpcChannel getRpcChannel() { - return rpcChannel; - } - - public long getRpcTimeout() { - return rpcTimeout; - } - - public Object getStateMutex() { - return stateMutex; - } - - public boolean isStateTransferred() { - return stateTransferred; - } - - public MapOwner getMapOwner() { - return mapOwner; - } - - public ClassLoader[] getExternalLoaders() { - return externalLoaders; - } - - public int getChannelSendOptions() { - return channelSendOptions; - } - - public long getAccessTimeout() { - return accessTimeout; - } - - public void setMapOwner(MapOwner mapOwner) { - this.mapOwner = mapOwner; - } - - public void setExternalLoaders(ClassLoader[] externalLoaders) { - this.externalLoaders = externalLoaders; - } - - public void setChannelSendOptions(int channelSendOptions) { - this.channelSendOptions = channelSendOptions; - } - - public void setAccessTimeout(long accessTimeout) { - this.accessTimeout = accessTimeout; - } - -} diff --git a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java b/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java deleted file mode 100644 index 1e4d3af50d..0000000000 --- a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.tribes; - -import org.apache.catalina.tribes.Channel; -import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.group.ChannelInterceptorBase; - -/** - * A static interceptor to disables multicast. - * Can be removed when/if the function gets added to Tribes. - * See Tomcat email http://markmail.org/message/doqu7pfl2hvvdfcl - */ -public class DisableMcastInterceptor extends ChannelInterceptorBase { - - public DisableMcastInterceptor() { - super(); - } - - public void start(int svc) throws ChannelException { - svc = (svc & (~Channel.MBR_TX_SEQ)); - super.start(svc); - } -} diff --git a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java b/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java deleted file mode 100644 index 9e540743bf..0000000000 --- a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.tribes; - -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * The Map that can fire events on put/remove of entries - */ -public abstract class MapStore extends ConcurrentHashMap<Object, Object> { - private static final long serialVersionUID = -2127235547082144368L; - private List<MapListener> listeners = new CopyOnWriteArrayList<MapListener>(); - - protected MapStore(int initialCapacity, float loadFactor, int concurrencyLevel) { - super(initialCapacity, loadFactor, concurrencyLevel); - } - - @Override - public Object put(Object key, Object value) { - Object old = super.put(key, value); - if (old != null) { - for (MapListener listener : listeners) { - listener.entryUpdated(key, old, value); - } - } else { - for (MapListener listener : listeners) { - listener.entryAdded(key, value); - } - - } - return old; - } - - @Override - public Object remove(Object key) { - Object old = super.remove(key); - if (old != null) { - for (MapListener listener : listeners) { - listener.entryRemoved(key, old); - } - } - return old; - } - - public void addListener(MapListener listener) { - listeners.add(listener); - } - - public List<MapListener> getListeners() { - return listeners; - } - - public boolean removeListener(MapListener listener) { - return listeners.remove(listener); - } - - public static interface MapListener { - void entryAdded(Object key, Object value); - - void entryUpdated(Object key, Object oldValue, Object newValue); - - void entryRemoved(Object key, Object value); - } -} diff --git a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedDomainRegistry.java b/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedDomainRegistry.java deleted file mode 100644 index e101f916dd..0000000000 --- a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedDomainRegistry.java +++ /dev/null @@ -1,509 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.tribes; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.URI; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; -import java.util.concurrent.Callable; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.xml.namespace.QName; - -import org.apache.catalina.tribes.Channel; -import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.ChannelReceiver; -import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.group.GroupChannel; -import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor; -import org.apache.catalina.tribes.membership.McastService; -import org.apache.catalina.tribes.membership.StaticMember; -import org.apache.catalina.tribes.transport.ReceiverBase; -import org.apache.tuscany.sca.assembly.Composite; -import org.apache.tuscany.sca.assembly.Endpoint; -import org.apache.tuscany.sca.core.ExtensionPointRegistry; -import org.apache.tuscany.sca.core.LifeCycleListener; -import org.apache.tuscany.sca.endpoint.tribes.AbstractReplicatedMap.MapEntry; -import org.apache.tuscany.sca.endpoint.tribes.MapStore.MapListener; -import org.apache.tuscany.sca.runtime.BaseDomainRegistry; -import org.apache.tuscany.sca.runtime.DomainRegistryURI; -import org.apache.tuscany.sca.runtime.DomainRegistry; -import org.apache.tuscany.sca.runtime.ContributionDescription; -import org.apache.tuscany.sca.runtime.RuntimeEndpoint; - -/** - * A replicated DomainRegistry based on Apache Tomcat Tribes - */ -public class ReplicatedDomainRegistry extends BaseDomainRegistry implements DomainRegistry, LifeCycleListener, - MapListener { - private final static Logger logger = Logger.getLogger(ReplicatedDomainRegistry.class.getName()); - private static final String MULTICAST_ADDRESS = "228.0.0.100"; - private static final int MULTICAST_PORT = 50000; - - private static final int FIND_REPEAT_COUNT = 10; - - private int port = MULTICAST_PORT; - private String address = MULTICAST_ADDRESS; - private String bind = null; - private int timeout = 50; - private String receiverAddress; - private int receiverPort = 4000; - private int receiverAutoBind = 100; - private List<URI> staticRoutes; - - private ReplicatedMap map; - - private String id; - private boolean noMultiCast; - - private static final GroupChannel createChannel(String address, int port, String bindAddress) { - - //create a channel - GroupChannel channel = new GroupChannel(); - McastService mcastService = (McastService)channel.getMembershipService(); - mcastService.setPort(port); - mcastService.setAddress(address); - - // REVIEW: In my case, there are multiple IP addresses - // One for the WIFI and the other one for VPN. For some reason the VPN one doesn't support - // Multicast - - if (bindAddress != null) { - mcastService.setBind(bindAddress); - } else { - mcastService.setBind(getBindAddress()); - } - - return channel; - } - - public ReplicatedDomainRegistry(ExtensionPointRegistry registry, - Map<String, String> attributes, - String domainRegistryURI, - String domainURI) { - super(registry, attributes, domainRegistryURI, domainURI); - getParameters(attributes, domainRegistryURI); - } - - private Map<String, String> getParameters(Map<String, String> attributes, String domainRegistryURI) { - Map<String, String> map = new HashMap<String, String>(); - if (attributes != null) { - map.putAll(attributes); - } - URI uri = URI.create(domainRegistryURI); - if (uri.getHost() != null) { - map.put("address", uri.getHost()); - } - if (uri.getPort() != -1) { - map.put("port", String.valueOf(uri.getPort())); - } - - if (domainRegistryURI.startsWith("tuscany")) { - setTuscanyConfig(map, domainRegistryURI); - setConfig(map); - return map; - } - - int index = domainRegistryURI.indexOf('?'); - if (index == -1) { - setConfig(map); - return map; - } - String query = domainRegistryURI.substring(index + 1); - try { - query = URLDecoder.decode(query, "UTF-8"); - } catch (UnsupportedEncodingException e) { - throw new IllegalArgumentException(e); - } - String[] params = query.split("&"); - for (String param : params) { - index = param.indexOf('='); - if (index != -1) { - map.put(param.substring(0, index), param.substring(index + 1)); - } - } - setConfig(map); - return map; - } - - private void setTuscanyConfig(Map<String, String> map, String domainRegistryURI) { - DomainRegistryURI tuscanyURI = new DomainRegistryURI(domainRegistryURI); - map.put("address", tuscanyURI.getMulticastAddress()); - map.put("port", Integer.toString(tuscanyURI.getMulticastPort())); - map.put("bind", tuscanyURI.getBindAddress()); - map.put("receiverPort", Integer.toString(tuscanyURI.getListenPort())); - if (tuscanyURI.isMulticastDisabled()) { - map.put("nomcast", "true"); - } - if (tuscanyURI.getRemotes().size() > 0) { - String routes = ""; - for (int i=0; i<tuscanyURI.getRemotes().size(); i++) { - routes += tuscanyURI.getRemotes().get(i); - if (i < tuscanyURI.getRemotes().size()) { - routes += ","; - } - } - map.put("routes", routes); - } - } - - private void setConfig(Map<String, String> attributes) { - String portStr = attributes.get("port"); - if (portStr != null) { - port = Integer.parseInt(portStr); - if (port == -1) { - port = MULTICAST_PORT; - } - } - String address = attributes.get("address"); - if (address == null) { - address = MULTICAST_ADDRESS; - } - bind = attributes.get("bind"); - String timeoutStr = attributes.get("timeout"); - if (timeoutStr != null) { - timeout = Integer.parseInt(timeoutStr); - } - - String routesStr = attributes.get("routes"); - if (routesStr != null) { - StringTokenizer st = new StringTokenizer(routesStr); - staticRoutes = new ArrayList<URI>(); - while (st.hasMoreElements()) { - staticRoutes.add(URI.create("tcp://" + st.nextToken())); - } - } - String mcast = attributes.get("nomcast"); - if (mcast != null) { - noMultiCast = Boolean.valueOf(mcast); - } - receiverAddress = attributes.get("receiverAddress"); - String recvPort = attributes.get("receiverPort"); - if (recvPort != null) { - receiverPort = Integer.parseInt(recvPort); - } - String recvAutoBind = attributes.get("receiverAutoBind"); - if (recvAutoBind != null) { - receiverAutoBind = Integer.parseInt(recvAutoBind); - } - } - - public void start() { - if (map != null) { - throw new IllegalStateException("The registry has already been started"); - } - GroupChannel channel = createChannel(address, port, bind); - map = - new ReplicatedMap(null, channel, timeout, this.domainURI, - new ClassLoader[] {ReplicatedDomainRegistry.class.getClassLoader()}); - map.addListener(this); - - if (noMultiCast) { - map.getChannel().addInterceptor(new DisableMcastInterceptor()); - } - - // Configure the receiver ports - ChannelReceiver receiver = channel.getChannelReceiver(); - if (receiver instanceof ReceiverBase) { - if (receiverAddress != null) { - ((ReceiverBase)receiver).setAddress(receiverAddress); - } - ((ReceiverBase)receiver).setPort(receiverPort); - ((ReceiverBase)receiver).setAutoBind(receiverAutoBind); - } - - /* - Object sender = channel.getChannelSender(); - if (sender instanceof ReplicationTransmitter) { - sender = ((ReplicationTransmitter)sender).getTransport(); - } - if (sender instanceof AbstractSender) { - ((AbstractSender)sender).setKeepAliveCount(0); - ((AbstractSender)sender).setMaxRetryAttempts(5); - } - */ - - if (staticRoutes != null) { - StaticMembershipInterceptor smi = new StaticMembershipInterceptor(); - for (URI staticRoute : staticRoutes) { - Member member; - try { - // The port has to match the receiver port - member = new StaticMember(staticRoute.getHost(), staticRoute.getPort(), 5000); - } catch (IOException e) { - throw new RuntimeException(e); - } - smi.addStaticMember(member); - logger.info("Added static route: " + staticRoute.getHost() + ":" + staticRoute.getPort()); - } - smi.setLocalMember(map.getChannel().getLocalMember(false)); - map.getChannel().addInterceptor(smi); - } - - try { - map.getChannel().start(Channel.DEFAULT); - } catch (ChannelException e) { - throw new IllegalStateException(e); - } - - } - - public void stop() { - if (map != null) { - map.removeListener(this); - Channel channel = map.getChannel(); - map.breakdown(); - try { - channel.stop(Channel.DEFAULT); - } catch (ChannelException e) { - logger.log(Level.WARNING, e.getMessage(), e); - } - map = null; - } - } - - public void addEndpoint(Endpoint endpoint) { - map.put(endpoint.getURI(), endpoint); - logger.info("Add endpoint - " + endpoint); - } - - public List<Endpoint> findEndpoint(String uri) { - List<Endpoint> foundEndpoints = new ArrayList<Endpoint>(); - - // in the failure case we repeat the look up after a short - // delay to take account of tribes replication delays - int repeat = FIND_REPEAT_COUNT; - - while (repeat > 0) { - for (Object v : map.values()) { - Endpoint endpoint = (Endpoint)v; - // TODO: implement more complete matching - logger.fine("Matching against - " + endpoint); - if (endpoint.matches(uri)) { - MapEntry entry = map.getInternal(endpoint.getURI()); - // if (!entry.isPrimary()) { - ((RuntimeEndpoint)endpoint).bind(registry, this); - // } - foundEndpoints.add(endpoint); - logger.fine("Found endpoint with matching service - " + endpoint); - repeat = 0; - } - // else the service name doesn't match - } - - if (foundEndpoints.size() == 0) { - // the service name doesn't match any endpoints so wait a little and try - // again in case this is caused by tribes synch delays - logger.info("Repeating endpoint reference match - " + uri); - repeat--; - try { - Thread.sleep(1000); - } catch (Exception ex) { - // do nothing - repeat = 0; - } - } - } - - return foundEndpoints; - } - - private boolean isLocal(MapEntry entry) { - return entry.getPrimary().equals(map.getChannel().getLocalMember(false)); - } - - public Endpoint getEndpoint(String uri) { - return (Endpoint)map.get(uri); - } - - public List<Endpoint> getEndpoints() { - return new ArrayList(map.values()); - } - - public void removeEndpoint(Endpoint endpoint) { - map.remove(endpoint.getURI()); - logger.info("Remove endpoint - " + endpoint); - } - - public void replicate(boolean complete) { - map.replicate(complete); - } - - public void updateEndpoint(String uri, Endpoint endpoint) { - Endpoint oldEndpoint = getEndpoint(uri); - if (oldEndpoint == null) { - throw new IllegalArgumentException("Endpoint is not found: " + uri); - } - map.put(endpoint.getURI(), endpoint); - } - - public void entryAdded(Object key, Object value) { - MapEntry entry = (MapEntry)value; - Endpoint newEp = (Endpoint)entry.getValue(); - if (!isLocal(entry)) { - logger.info(id + " Remote endpoint added: " + entry.getValue()); - } - endpointAdded(newEp); - } - - public void entryRemoved(Object key, Object value) { - MapEntry entry = (MapEntry)value; - if (!isLocal(entry)) { - logger.info(id + " Remote endpoint removed: " + entry.getValue()); - } - endpointRemoved((Endpoint)entry.getValue()); - } - - public void entryUpdated(Object key, Object oldValue, Object newValue) { - MapEntry oldEntry = (MapEntry)oldValue; - MapEntry newEntry = (MapEntry)newValue; - if (!isLocal(newEntry)) { - logger.info(id + " Remote endpoint updated: " + newEntry.getValue()); - } - Endpoint oldEp = (Endpoint)oldEntry.getValue(); - Endpoint newEp = (Endpoint)newEntry.getValue(); - endpointUpdated(oldEp, newEp); - } - - private static String getBindAddress() { - try { - Enumeration<NetworkInterface> nis = NetworkInterface.getNetworkInterfaces(); - while (nis.hasMoreElements()) { - NetworkInterface ni = nis.nextElement(); - // The following APIs require JDK 1.6 - /* - if (ni.isLoopback() || !ni.isUp() || !ni.supportsMulticast()) { - continue; - } - */ - Enumeration<InetAddress> ips = ni.getInetAddresses(); - if (!ips.hasMoreElements()) { - continue; - } - while (ips.hasMoreElements()) { - InetAddress addr = ips.nextElement(); - if (addr.isLoopbackAddress()) { - continue; - } - return addr.getHostAddress(); - } - } - return InetAddress.getLocalHost().getHostAddress(); - } catch (Exception e) { - logger.log(Level.SEVERE, e.getMessage(), e); - return null; - } - } - - @Override - public List<String> getInstalledContributionURIs() { - // TODO Auto-generated method stub - return null; - } - - @Override - public void uninstallContribution(String uri) { - // TODO Auto-generated method stub - - } - - @Override - public void installContribution(ContributionDescription cd) { - // TODO Auto-generated method stub - - } - - @Override - public ContributionDescription getInstalledContribution(String uri) { - // TODO Auto-generated method stub - return null; - } - - @Override - public void addRunningComposite(String contributionURI, Composite composite) { - // TODO Auto-generated method stub - - } - - @Override - public void removeRunningComposite(String contributionURI, String compositeURI) { - // TODO Auto-generated method stub - - } - - @Override - public Composite getRunningComposite(String contributionURI, String compositeURI) { - // TODO Auto-generated method stub - return null; - } - - @Override - public Map<String, List<String>> getRunningCompositeURIs() { - // TODO Auto-generated method stub - return null; - } - - @Override - public void updateInstalledContribution(ContributionDescription cd) { - // TODO Auto-generated method stub - - } - - @Override - public List<String> getNodeNames() { - // TODO Auto-generated method stub - return null; - } - - @Override - public String getLocalNodeName() { - // TODO Auto-generated method stub - return null; - } - - @Override - public String getRunningNodeName(String contributionURI, String compositeURI) { - // TODO Auto-generated method stub - return null; - } - - @Override - public String remoteCommand(String memberName, Callable<String> command) { - // TODO Auto-generated method stub - return null; - } - - @Override - public String getContainingCompositesContributionURI(String componentName) { - // TODO Auto-generated method stub - return null; - } - -} diff --git a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java deleted file mode 100644 index 669ad82192..0000000000 --- a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tuscany.sca.endpoint.tribes; - -import java.io.Serializable; -import java.util.Iterator; -import java.util.Map; - -import org.apache.catalina.tribes.Channel; -import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.ChannelListener; -import org.apache.catalina.tribes.Member; -import org.apache.catalina.tribes.MembershipListener; -import org.apache.catalina.tribes.group.RpcCallback; - -/** - * This file is copied from: - * https://svn.apache.org/repos/asf/tomcat/tc6.0.x/tags/TOMCAT_6_0_20/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java - * - */ -public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, ChannelListener, MembershipListener { - private static final long serialVersionUID = -6318779627600581121L; - protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ReplicatedMap.class); - - //------------------------------------------------------------------------------ - // CONSTRUCTORS / DESTRUCTORS - //------------------------------------------------------------------------------ - /** - * Creates a new map - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messags - * @param mapContextName String - unique name for this map, to allow multiple maps per channel - * @param initialCapacity int - the size of this map, see HashMap - * @param loadFactor float - load factor, see HashMap - */ - public ReplicatedMap(MapOwner owner, - Channel channel, - long timeout, - String mapContextName, - int initialCapacity, - float loadFactor, - ClassLoader[] cls) { - super(owner, channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls); - } - - /** - * Creates a new map - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messags - * @param mapContextName String - unique name for this map, to allow multiple maps per channel - * @param initialCapacity int - the size of this map, see HashMap - */ - public ReplicatedMap(MapOwner owner, - Channel channel, - long timeout, - String mapContextName, - int initialCapacity, - ClassLoader[] cls) { - super(owner, channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, - Channel.SEND_OPTIONS_DEFAULT, cls); - } - - /** - * Creates a new map - * @param channel The channel to use for communication - * @param timeout long - timeout for RPC messags - * @param mapContextName String - unique name for this map, to allow multiple maps per channel - */ - public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) { - super(owner, channel, timeout, mapContextName, AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY, - AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls); - } - - //------------------------------------------------------------------------------ - // METHODS TO OVERRIDE - //------------------------------------------------------------------------------ - protected int getStateMessageType() { - return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY; - } - - /** - * publish info about a map pair (key/value) to other nodes in the cluster - * @param key Object - * @param value Object - * @return Member - the backup node - * @throws ChannelException - */ - protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { - if (!(key instanceof Serializable && value instanceof Serializable)) - return new Member[0]; - //select a backup node - Member[] members = getMapMembers(); - - if (members == null || members.length == 0) { - return new Member[0]; - } - - //publish the data out to all nodes - MapMessage msg = - new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value, - null, channel.getLocalMember(false), members); - - getChannel().send(members, msg, getChannelSendOptions()); - - return members; - } - - /** - * Override the base method to look up existing entries only - */ - public Object get(Object key) { - MapEntry entry = super.getInternal(key); - if (log.isTraceEnabled()) - log.trace("Requesting id:" + key + " entry:" + entry); - if (entry == null) { - return null; - } - return entry.getValue(); - } - - /** - * Override the base method to remove all entries owned by the member that disappeared - */ - public void memberDisappeared(Member member) { - boolean removed = false; - synchronized (mapMembers) { - removed = (mapMembers.remove(member) != null); - if (!removed) { - if (log.isDebugEnabled()) - log.debug("Member[" + member + "] disappeared, but was not present in the map."); - return; //the member was not part of our map. - } - } - - Iterator<Map.Entry<Object, Object>> i = super.entrySetFull().iterator(); - while (i.hasNext()) { - Map.Entry<Object, Object> e = i.next(); - MapEntry entry = (MapEntry)super.getInternal(e.getKey()); - if (entry == null) { - continue; - } - if (member.equals(entry.getPrimary())) { - if (log.isDebugEnabled()) - log.debug("[2] Primary disappeared"); - i.remove(); - } //end if - } //while - } -} diff --git a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java b/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java deleted file mode 100644 index d0beb95141..0000000000 --- a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.tribes; - -import org.apache.tuscany.sca.core.ExtensionPointRegistry; -import org.apache.tuscany.sca.runtime.BaseDomainRegistryFactory; -import org.apache.tuscany.sca.runtime.DomainRegistry; - -/** - * The utility responsible for finding the endpoint regstry by the scheme and creating instances for the - * given domain - */ -public class TribesDomainRegistryFactory extends BaseDomainRegistryFactory { - private final static String[] schemes = new String[] {"multicast", "tribes", "tuscany"}; - - /** - * @param extensionRegistry - */ - public TribesDomainRegistryFactory(ExtensionPointRegistry registry) { - super(registry); - } - - protected DomainRegistry createEndpointRegistry(String endpointRegistryURI, String domainURI) { - DomainRegistry domainRegistry = - new ReplicatedDomainRegistry(registry, null, endpointRegistryURI, domainURI); - return domainRegistry; - } - - public String[] getSupportedSchemes() { - return schemes; - } -} diff --git a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory b/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory deleted file mode 100644 index 734afb2ac5..0000000000 --- a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory +++ /dev/null @@ -1,17 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-org.apache.tuscany.sca.endpoint.tribes.TribesDomainRegistryFactory;ranking=50
\ No newline at end of file diff --git a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java b/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java deleted file mode 100644 index 96d2119b87..0000000000 --- a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.tribes; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.tuscany.sca.assembly.AssemblyFactory; -import org.apache.tuscany.sca.assembly.Binding; -import org.apache.tuscany.sca.assembly.Component; -import org.apache.tuscany.sca.assembly.Endpoint; -import org.apache.tuscany.sca.assembly.SCABindingFactory; -import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry; -import org.apache.tuscany.sca.core.ExtensionPointRegistry; -import org.apache.tuscany.sca.core.FactoryExtensionPoint; -import org.apache.tuscany.sca.runtime.EndpointListener; -import org.apache.tuscany.sca.runtime.RuntimeEndpoint; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -// Ignore so its not run in the build yet till its working -@Ignore("TUSCANY-3718") -public class MultiRegTestCase implements EndpointListener { - private static ExtensionPointRegistry extensionPoints; - private static AssemblyFactory assemblyFactory; - private static SCABindingFactory scaBindingFactory; - - @BeforeClass - public static void init() { - extensionPoints = new DefaultExtensionPointRegistry(); - FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class); - assemblyFactory = factories.getFactory(AssemblyFactory.class); - scaBindingFactory = factories.getFactory(SCABindingFactory.class); - } - - @Test - public void testReplication() throws Exception { - RuntimeEndpoint ep1 = createEndpoint("ep1uri"); - - // String host = InetAddress.getLocalHost().getHostAddress(); - String bind = "127.0.0.1"; // "9.65.158.31"; - String port1 = "8085"; - String port2 = "8086"; - String port3 = "8087"; - String range = "1"; - - Map<String, String> attrs1 = new HashMap<String, String>(); - // attrs1.put("nomcast", "true"); - attrs1.put("bind", bind); - attrs1.put("receiverPort", port1); - attrs1.put("receiverAutoBind", range); - // attrs1.put("routes", host + ":" + port2 + " " + host + ":" + port3); - ReplicatedDomainRegistry reg1 = new ReplicatedDomainRegistry(extensionPoints, attrs1, "foo", "bar"); - reg1.addEndpointListener(this); - reg1.start(); - - Map<String, String> attrs2 = new HashMap<String, String>(); - // attrs2.put("nomcast", "true"); - attrs2.put("bind", bind); - attrs2.put("receiverPort", port2); - attrs2.put("receiverAutoBind", range); - // attrs2.put("routes", host + ":"+port1); - ReplicatedDomainRegistry reg2 = new ReplicatedDomainRegistry(extensionPoints, attrs2, "foo", "bar"); - reg2.addEndpointListener(this); - reg2.start(); - - Map<String, String> attrs3 = new HashMap<String, String>(); - // attrs3.put("nomcast", "true"); - attrs3.put("bind", bind); - attrs3.put("receiverPort", port3); - attrs3.put("receiverAutoBind", range); - // attrs3.put("routes", host + ":"+port1); - ReplicatedDomainRegistry reg3 = new ReplicatedDomainRegistry(extensionPoints, attrs3, "foo", "bar"); - reg3.addEndpointListener(this); - reg3.start(); - - ep1.bind(extensionPoints, reg1); - reg1.addEndpoint(ep1); - assertExists(reg1, "ep1uri"); - assertExists(reg2, "ep1uri"); - assertExists(reg3, "ep1uri"); - - RuntimeEndpoint ep2 = createEndpoint("ep2uri"); - ep2.bind(extensionPoints, reg2); - reg2.addEndpoint(ep2); - assertExists(reg2, "ep2uri"); - assertExists(reg1, "ep2uri"); - assertExists(reg3, "ep2uri"); - - reg1.stop(); - Thread.sleep(6000); - Assert.assertNull(reg2.getEndpoint("ep1uri")); - Assert.assertNull(reg3.getEndpoint("ep1uri")); - assertExists(reg2, "ep2uri"); - assertExists(reg3, "ep2uri"); - - reg1.start(); - ep1.bind(extensionPoints, reg1); - reg1.addEndpoint(ep1); - assertExists(reg1, "ep1uri"); - assertExists(reg2, "ep1uri"); - assertExists(reg3, "ep1uri"); - - reg1.stop(); - reg2.stop(); - reg3.stop(); - System.out.println(); // closed - } - - private Endpoint assertExists(ReplicatedDomainRegistry reg, String uri) throws InterruptedException { - Endpoint ep = null; - int count = 0; - while (ep == null && count < 15) { - ep = reg.getEndpoint(uri); - Thread.sleep(1000); - count++; - System.out.println(reg + ": tries=" + count); - } - Assert.assertNotNull(ep); - Assert.assertEquals(uri, ep.getURI()); - return ep; - } - - private RuntimeEndpoint createEndpoint(String uri) { - RuntimeEndpoint ep = (RuntimeEndpoint) assemblyFactory.createEndpoint(); - Component comp = assemblyFactory.createComponent(); - ep.setComponent(comp); - ep.setService(assemblyFactory.createComponentService()); - Binding b = scaBindingFactory.createSCABinding(); - ep.setBinding(b); - ep.setURI(uri); - return ep; - } - - private void print(String prefix, Endpoint ep) { - System.out.println(prefix + ": "+ep); - } - - public void endpointAdded(Endpoint endpoint) { - print("Added", endpoint); - } - - public void endpointRemoved(Endpoint endpoint) { - print("Removed", endpoint); - } - - public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) { - print("Updated", newEndpoint); - } - -} diff --git a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java b/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java deleted file mode 100644 index f469983c76..0000000000 --- a/sca-java-2.x/branches/2.0-Beta3/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.tribes; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.tuscany.sca.assembly.AssemblyFactory; -import org.apache.tuscany.sca.assembly.Endpoint; -import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry; -import org.apache.tuscany.sca.core.FactoryExtensionPoint; -import org.apache.tuscany.sca.runtime.RuntimeEndpoint; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; - -@Ignore("TUSCANY-3718") -public class ReplicatedEndpointRegistryTestCase { - - @Test - // @Ignore("Ignore this test case for now as it might be sensitive to the multicast settings for a multi-homed machine") - public void testReplicate() throws InterruptedException { - DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry(); - FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class); - AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class); - - Map<String, String> attrs = new HashMap<String, String>(); - attrs.put("bind", "127.0.0.1"); - ReplicatedDomainRegistry ep1 = new ReplicatedDomainRegistry(extensionPoints, attrs, "foo", "bar"); - System.out.println("ep1 is: " + ep1); - ep1.start(); - - Endpoint e1 = assemblyFactory.createEndpoint(); - e1.setURI("e1uri"); - ((RuntimeEndpoint) e1).bind(extensionPoints, ep1); - ep1.addEndpoint(e1); - - Endpoint e1p = ep1.getEndpoint("e1uri"); - System.out.println("EP1 in Registry 1: " + e1p); - Assert.assertNotNull(e1p); - - ReplicatedDomainRegistry ep2 = new ReplicatedDomainRegistry(extensionPoints, attrs, "foo", "bar"); - System.out.println("ep2 is: " + ep2); - ep2.start(); - Thread.sleep(5000); - - Endpoint e1p2 = ep2.getEndpoint("e1uri"); - System.out.println("EP1 in Registry 2: " + e1p2); - Assert.assertNotNull(e1p2); - - ReplicatedDomainRegistry ep3 = new ReplicatedDomainRegistry(extensionPoints, attrs, "foo", "bar"); - System.out.println("ep3 is: " + ep3); - ep3.start(); - Thread.sleep(5000); - - Endpoint e1p3 = ep3.getEndpoint("e1uri"); - System.out.println("EP1 in Registry 3: " + e1p3); - Assert.assertNotNull(e1p3); - - ep1.stop(); - ep2.stop(); - ep3.stop(); - } - - public static void main(String[] args) throws Exception { - new ReplicatedEndpointRegistryTestCase().testReplicate(); - } -} |