summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/META-INF/MANIFEST.MF28
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/pom.xml65
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java1564
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java41
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java83
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java420
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java163
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java49
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory17
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java170
-rw-r--r--sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java85
11 files changed, 0 insertions, 2685 deletions
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/META-INF/MANIFEST.MF b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/META-INF/MANIFEST.MF
deleted file mode 100644
index 06df55ef38..0000000000
--- a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/META-INF/MANIFEST.MF
+++ /dev/null
@@ -1,28 +0,0 @@
-Manifest-Version: 1.0
-Private-Package: org.apache.tuscany.sca.xsd.impl;version="2.0.0"
-SCA-Version: 1.1
-Bundle-Name: Apache Tuscany SCA Tomcat Tribes Based EndPoint Registry
-Bundle-Vendor: The Apache Software Foundation
-Bundle-Version: 2.0.0
-Bundle-ManifestVersion: 2
-Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt
-Bundle-Description: Apache Tuscany SCA XSD Model
-Bundle-SymbolicName: org.apache.tuscany.sca.endpoint.tribes
-Bundle-DocURL: http://www.apache.org/
-Bundle-RequiredExecutionEnvironment: J2SE-1.5,JavaSE-1.6
-Import-Package: org.apache.catalina.tribes,
- org.apache.catalina.tribes.group,
- org.apache.catalina.tribes.group.interceptors,
- org.apache.catalina.tribes.io,
- org.apache.catalina.tribes.membership,
- org.apache.catalina.tribes.tipis,
- org.apache.catalina.tribes.transport,
- org.apache.catalina.tribes.util,
- org.apache.juli.logging;resolution:=optional,
- org.apache.tuscany.sca.assembly;version="2.0.0",
- org.apache.tuscany.sca.core;version="2.0.0",
- org.apache.tuscany.sca.core.assembly.impl;scope=internal;version="2.0.0";resolution:=optional,
- org.apache.tuscany.sca.management;version="2.0.0",
- org.apache.tuscany.sca.policy;version="2.0.0",
- org.apache.tuscany.sca.runtime;version="2.0.0"
-Export-Package: org.apache.tuscany.sca.endpoint.tribes;version="2.0.0"
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/pom.xml b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/pom.xml
deleted file mode 100644
index da47f27d4a..0000000000
--- a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/pom.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
--->
-<project>
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.tuscany.sca</groupId>
- <artifactId>tuscany-modules</artifactId>
- <version>2.0-Beta2-SNAPSHOT</version>
- <relativePath>../pom.xml</relativePath>
- </parent>
- <artifactId>tuscany-endpoint-tribes</artifactId>
- <name>Apache Tuscany SCA Tomcat Tribes Based EndPoint Registry</name>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.tomcat</groupId>
- <artifactId>tribes</artifactId>
- <!-- DO NOT upgrade to 6.0.20: https://issues.apache.org/bugzilla/show_bug.cgi?id=47419 -->
- <version>6.0.26</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.tuscany.sca</groupId>
- <artifactId>tuscany-core-spi</artifactId>
- <version>2.0-Beta2-SNAPSHOT</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.tuscany.sca</groupId>
- <artifactId>tuscany-core</artifactId>
- <version>2.0-Beta2-SNAPSHOT</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.tuscany.sca</groupId>
- <artifactId>tuscany-deployment</artifactId>
- <version>2.0-Beta2-SNAPSHOT</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.tuscany.sca</groupId>
- <artifactId>tuscany-implementation-java-runtime</artifactId>
- <version>2.0-Beta2-SNAPSHOT</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
-</project>
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
deleted file mode 100644
index fe683af025..0000000000
--- a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
+++ /dev/null
@@ -1,1564 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tuscany.sca.endpoint.tribes;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelListener;
-import org.apache.catalina.tribes.Heartbeat;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.group.Response;
-import org.apache.catalina.tribes.group.RpcCallback;
-import org.apache.catalina.tribes.group.RpcChannel;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.membership.MemberImpl;
-import org.apache.catalina.tribes.tipis.ReplicatedMapEntry;
-import org.apache.catalina.tribes.util.Arrays;
-import org.apache.juli.logging.Log;
-import org.apache.juli.logging.LogFactory;
-
-/**
- * This file is copied from:
- * https://svn.apache.org/repos/asf/tomcat/tc6.0.x/tags/TOMCAT_6_0_20/java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java
- *
- * We need to intercept the put/remove calls on the base Map to fire events
- */
-public abstract class AbstractReplicatedMap extends MapStore implements RpcCallback, ChannelListener,
- MembershipListener, Heartbeat {
- private static final long serialVersionUID = -2703358020113899074L;
-
- protected static Log log = LogFactory.getLog(AbstractReplicatedMap.class);
-
- /**
- * The default initial capacity - MUST be a power of two.
- */
- public static final int DEFAULT_INITIAL_CAPACITY = 16;
-
- /**
- * The load factor used when none specified in constructor.
- **/
- public static final float DEFAULT_LOAD_FACTOR = 0.75f;
-
- /**
- * Used to identify the map
- */
- final String chset = "ISO-8859-1";
-
- //------------------------------------------------------------------------------
- // INSTANCE VARIABLES
- //------------------------------------------------------------------------------
- protected abstract int getStateMessageType();
-
- /**
- * Timeout for RPC messages, how long we will wait for a reply
- */
- protected transient long rpcTimeout = 5000;
- /**
- * Reference to the channel for sending messages
- */
- protected transient Channel channel;
- /**
- * The RpcChannel to send RPC messages through
- */
- protected transient RpcChannel rpcChannel;
- /**
- * The Map context name makes this map unique, this
- * allows us to have more than one map shared
- * through one channel
- */
- protected transient byte[] mapContextName;
- /**
- * Has the state been transferred
- */
- protected transient boolean stateTransferred = false;
- /**
- * Simple lock object for transfers
- */
- protected transient Object stateMutex = new Object();
- /**
- * A list of members in our map
- */
- protected transient Map<Member, Long> mapMembers = new HashMap<Member, Long>();
- /**
- * Our default send options
- */
- protected transient int channelSendOptions = Channel.SEND_OPTIONS_DEFAULT;
- /**
- * The owner of this map, ala a SessionManager for example
- */
- protected transient MapOwner mapOwner;
- /**
- * External class loaders if serialization and deserialization is to be performed successfully.
- */
- protected transient ClassLoader[] externalLoaders;
-
- /**
- * The node we are currently backing up data to, this index will rotate
- * on a round robin basis
- */
- protected transient int currentNode = 0;
-
- /**
- * Since the map keeps internal membership
- * this is the timeout for a ping message to be responded to
- * If a remote map doesn't respond within this timeframe,
- * its considered dead.
- */
- protected transient long accessTimeout = 5000;
-
- /**
- * Readable string of the mapContextName value
- */
- protected transient String mapname = "";
-
- //------------------------------------------------------------------------------
- // map owner interface
- //------------------------------------------------------------------------------
-
- public static interface MapOwner {
- public void objectMadePrimay(Object key, Object value);
- }
-
- //------------------------------------------------------------------------------
- // CONSTRUCTORS
- //------------------------------------------------------------------------------
-
- /**
- * Creates a new map
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messags
- * @param mapContextName String - unique name for this map, to allow multiple maps per channel
- * @param initialCapacity int - the size of this map, see HashMap
- * @param loadFactor float - load factor, see HashMap
- * @param cls - a list of classloaders to be used for deserialization of objects.
- */
- public AbstractReplicatedMap(MapOwner owner,
- Channel channel,
- long timeout,
- String mapContextName,
- int initialCapacity,
- float loadFactor,
- int channelSendOptions,
- ClassLoader[] cls) {
- super(initialCapacity, loadFactor, 15);
- init(owner, channel, mapContextName, timeout, channelSendOptions, cls);
-
- }
-
- /**
- * Helper methods, wraps a single member in an array
- * @param m Member
- * @return Member[]
- */
- protected Member[] wrap(Member m) {
- if (m == null)
- return new Member[0];
- else
- return new Member[] {m};
- }
-
- /**
- * Initializes the map by creating the RPC channel, registering itself as a channel listener
- * This method is also responsible for initiating the state transfer
- * @param owner Object
- * @param channel Channel
- * @param mapContextName String
- * @param timeout long
- * @param channelSendOptions int
- * @param cls ClassLoader[]
- */
- protected void init(MapOwner owner,
- Channel channel,
- String mapContextName,
- long timeout,
- int channelSendOptions,
- ClassLoader[] cls) {
- log.info("Initializing AbstractReplicatedMap with context name:" + mapContextName);
- this.mapOwner = owner;
- this.externalLoaders = cls;
- this.channelSendOptions = channelSendOptions;
- this.channel = channel;
- this.rpcTimeout = timeout;
-
- try {
- this.mapname = mapContextName;
- //unique context is more efficient if it is stored as bytes
- this.mapContextName = mapContextName.getBytes(chset);
- } catch (UnsupportedEncodingException x) {
- log.warn("Unable to encode mapContextName[" + mapContextName
- + "] using getBytes("
- + chset
- + ") using default getBytes()", x);
- this.mapContextName = mapContextName.getBytes();
- }
- if (log.isTraceEnabled())
- log.trace("Created Lazy Map with name:" + mapContextName
- + ", bytes:"
- + Arrays.toString(this.mapContextName));
-
- //create an rpc channel and add the map as a listener
- this.rpcChannel = new RpcChannel(this.mapContextName, channel, this);
- //add this map as a message listener
- this.channel.addChannelListener(this);
- //listen for membership notifications
- this.channel.addMembershipListener(this);
-
- try {
- //broadcast our map, this just notifies other members of our existence
- broadcast(MapMessage.MSG_INIT, true);
- //transfer state from another map
- transferState();
- //state is transferred, we are ready for messaging
- broadcast(MapMessage.MSG_START, true);
- } catch (ChannelException x) {
- log.warn("Unable to send map start message.");
- throw new RuntimeException("Unable to start replicated map.", x);
- }
- }
-
- /**
- * Sends a ping out to all the members in the cluster, not just map members
- * that this map is alive.
- * @param timeout long
- * @throws ChannelException
- */
- protected void ping(long timeout) throws ChannelException {
- //send out a map membership message, only wait for the first reply
- MapMessage msg =
- new MapMessage(this.mapContextName, MapMessage.MSG_INIT, false, null, null, null, channel
- .getLocalMember(false), null);
- if (channel.getMembers().length > 0) {
- //send a ping, wait for all nodes to reply
- Response[] resp =
- rpcChannel.send(channel.getMembers(),
- msg,
- RpcChannel.ALL_REPLY,
- (channelSendOptions),
- (int)accessTimeout);
- for (int i = 0; i < resp.length; i++) {
- memberAlive(resp[i].getSource());
- } //for
- }
- //update our map of members, expire some if we didn't receive a ping back
- synchronized (mapMembers) {
- Iterator it = mapMembers.entrySet().iterator();
- long now = System.currentTimeMillis();
- while (it.hasNext()) {
- Map.Entry entry = (Map.Entry)it.next();
- long access = ((Long)entry.getValue()).longValue();
- if ((now - access) > timeout) {
- it.remove();
- memberDisappeared((Member)entry.getKey());
- }
- }
- }//synch
- }
-
- /**
- * We have received a member alive notification
- * @param member Member
- */
- protected void memberAlive(Member member) {
- synchronized (mapMembers) {
- if (!mapMembers.containsKey(member)) {
- mapMemberAdded(member);
- } //end if
- mapMembers.put(member, new Long(System.currentTimeMillis()));
- }
- }
-
- /**
- * Helper method to broadcast a message to all members in a channel
- * @param msgtype int
- * @param rpc boolean
- * @throws ChannelException
- */
- protected void broadcast(int msgtype, boolean rpc) throws ChannelException {
- //send out a map membership message, only wait for the first reply
- MapMessage msg =
- new MapMessage(this.mapContextName, msgtype, false, null, null, null, channel.getLocalMember(false), null);
- if (rpc) {
- Response[] resp =
- rpcChannel.send(channel.getMembers(), msg, RpcChannel.FIRST_REPLY, (channelSendOptions), rpcTimeout);
- for (int i = 0; i < resp.length; i++) {
- mapMemberAdded(resp[i].getSource());
- messageReceived(resp[i].getMessage(), resp[i].getSource());
- }
- } else {
- channel.send(channel.getMembers(), msg, channelSendOptions);
- }
- }
-
- public void breakdown() {
- finalize();
- }
-
- public void finalize() {
- try {
- broadcast(MapMessage.MSG_STOP, false);
- } catch (Exception ignore) {
- }
- //cleanup
- if (this.rpcChannel != null) {
- this.rpcChannel.breakdown();
- }
- if (this.channel != null) {
- this.channel.removeChannelListener(this);
- this.channel.removeMembershipListener(this);
- }
- this.rpcChannel = null;
- this.channel = null;
- this.mapMembers.clear();
- super.clear();
- this.stateTransferred = false;
- this.externalLoaders = null;
- }
-
- public int hashCode() {
- return Arrays.hashCode(this.mapContextName);
- }
-
- public boolean equals(Object o) {
- if (o == null)
- return false;
- if (!(o instanceof AbstractReplicatedMap))
- return false;
- if (!(o.getClass().equals(this.getClass())))
- return false;
- AbstractReplicatedMap other = (AbstractReplicatedMap)o;
- return Arrays.equals(mapContextName, other.mapContextName);
- }
-
- //------------------------------------------------------------------------------
- // GROUP COM INTERFACES
- //------------------------------------------------------------------------------
- public Member[] getMapMembers(Map<Member, Long> members) {
- synchronized (members) {
- Member[] result = new Member[members.size()];
- members.keySet().toArray(result);
- return result;
- }
- }
-
- public Member[] getMapMembers() {
- return getMapMembers(this.mapMembers);
- }
-
- public Member[] getMapMembersExcl(Member[] exclude) {
- synchronized (mapMembers) {
- Map<Member, Long> list = (Map<Member, Long>)((HashMap<Member, Long>)mapMembers).clone();
- for (int i = 0; i < exclude.length; i++)
- list.remove(exclude[i]);
- return getMapMembers(list);
- }
- }
-
- /**
- * Replicates any changes to the object since the last time
- * The object has to be primary, ie, if the object is a proxy or a backup, it will not be replicated<br>
- * @param complete - if set to true, the object is replicated to its backup
- * if set to false, only objects that implement ReplicatedMapEntry and the isDirty() returns true will
- * be replicated
- */
- public void replicate(Object key, boolean complete) {
- if (log.isTraceEnabled())
- log.trace("Replicate invoked on key:" + key);
- MapEntry entry = (MapEntry)super.get(key);
- if (entry == null)
- return;
- if (!entry.isSerializable())
- return;
- if (entry != null && entry.isPrimary() && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) {
- Object value = entry.getValue();
- //check to see if we need to replicate this object isDirty()||complete
- boolean repl = complete || ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDirty());
-
- if (!repl) {
- if (log.isTraceEnabled())
- log.trace("Not replicating:" + key + ", no change made");
-
- return;
- }
- //check to see if the message is diffable
- boolean diff = ((value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable());
- MapMessage msg = null;
- if (diff) {
- ReplicatedMapEntry rentry = (ReplicatedMapEntry)entry.getValue();
- try {
- rentry.lock();
- //construct a diff message
- msg =
- new MapMessage(mapContextName, MapMessage.MSG_BACKUP, true, (Serializable)entry.getKey(), null,
- rentry.getDiff(), entry.getPrimary(), entry.getBackupNodes());
- } catch (IOException x) {
- log.error("Unable to diff object. Will replicate the entire object instead.", x);
- } finally {
- rentry.unlock();
- }
-
- }
- if (msg == null) {
- //construct a complete
- msg =
- new MapMessage(mapContextName, MapMessage.MSG_BACKUP, false, (Serializable)entry.getKey(),
- (Serializable)entry.getValue(), null, entry.getPrimary(), entry.getBackupNodes());
-
- }
- try {
- if (channel != null && entry.getBackupNodes() != null && entry.getBackupNodes().length > 0) {
- channel.send(entry.getBackupNodes(), msg, channelSendOptions);
- }
- } catch (ChannelException x) {
- log.error("Unable to replicate data.", x);
- }
- } //end if
-
- }
-
- /**
- * This can be invoked by a periodic thread to replicate out any changes.
- * For maps that don't store objects that implement ReplicatedMapEntry, this
- * method should be used infrequently to avoid large amounts of data transfer
- * @param complete boolean
- */
- public void replicate(boolean complete) {
- Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry<Object, Object> e = i.next();
- replicate(e.getKey(), complete);
- } //while
-
- }
-
- public void transferState() {
- try {
- Member[] members = getMapMembers();
- Member backup = members.length > 0 ? (Member)members[0] : null;
- if (backup != null) {
- MapMessage msg =
- new MapMessage(mapContextName, getStateMessageType(), false, null, null, null, null, null);
- Response[] resp =
- rpcChannel.send(new Member[] {backup}, msg, RpcChannel.FIRST_REPLY, channelSendOptions, rpcTimeout);
- if (resp.length > 0) {
- synchronized (stateMutex) {
- msg = (MapMessage)resp[0].getMessage();
- msg.deserialize(getExternalLoaders());
- List list = (List)msg.getValue();
- for (int i = 0; i < list.size(); i++) {
- messageReceived((Serializable)list.get(i), resp[0].getSource());
- } //for
- }
- } else {
- log.warn("Transfer state, 0 replies, probably a timeout.");
- }
- }
- } catch (ChannelException x) {
- log.error("Unable to transfer LazyReplicatedMap state.", x);
- } catch (IOException x) {
- log.error("Unable to transfer LazyReplicatedMap state.", x);
- } catch (ClassNotFoundException x) {
- log.error("Unable to transfer LazyReplicatedMap state.", x);
- }
- stateTransferred = true;
- }
-
- /**
- * @todo implement state transfer
- * @param msg Serializable
- * @return Serializable - null if no reply should be sent
- */
- public Serializable replyRequest(Serializable msg, final Member sender) {
- if (!(msg instanceof MapMessage))
- return null;
- MapMessage mapmsg = (MapMessage)msg;
-
- //map init request
- if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
- mapmsg.setPrimary(channel.getLocalMember(false));
- return mapmsg;
- }
-
- //map start request
- if (mapmsg.getMsgType() == MapMessage.MSG_START) {
- mapmsg.setPrimary(channel.getLocalMember(false));
- mapMemberAdded(sender);
- return mapmsg;
- }
-
- //backup request
- if (mapmsg.getMsgType() == MapMessage.MSG_RETRIEVE_BACKUP) {
- MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
- if (entry == null || (!entry.isSerializable()))
- return null;
- mapmsg.setValue((Serializable)entry.getValue());
- return mapmsg;
- }
-
- //state transfer request
- if (mapmsg.getMsgType() == MapMessage.MSG_STATE || mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY) {
- synchronized (stateMutex) { //make sure we dont do two things at the same time
- ArrayList<MapMessage> list = new ArrayList<MapMessage>();
- Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry<Object, Object> e = i.next();
- MapEntry entry = (MapEntry)super.get(e.getKey());
- if (entry != null && entry.isSerializable()) {
- boolean copy = (mapmsg.getMsgType() == MapMessage.MSG_STATE_COPY);
- MapMessage me =
- new MapMessage(mapContextName, copy ? MapMessage.MSG_COPY : MapMessage.MSG_PROXY, false,
- (Serializable)entry.getKey(), copy ? (Serializable)entry.getValue() : null,
- null, entry.getPrimary(), entry.getBackupNodes());
- list.add(me);
- }
- }
- mapmsg.setValue(list);
- return mapmsg;
-
- } //synchronized
- }
-
- return null;
-
- }
-
- /**
- * If the reply has already been sent to the requesting thread,
- * the rpc callback can handle any data that comes in after the fact.
- * @param msg Serializable
- * @param sender Member
- */
- public void leftOver(Serializable msg, Member sender) {
- //left over membership messages
- if (!(msg instanceof MapMessage))
- return;
-
- MapMessage mapmsg = (MapMessage)msg;
- try {
- mapmsg.deserialize(getExternalLoaders());
- if (mapmsg.getMsgType() == MapMessage.MSG_START) {
- mapMemberAdded(mapmsg.getPrimary());
- } else if (mapmsg.getMsgType() == MapMessage.MSG_INIT) {
- memberAlive(mapmsg.getPrimary());
- }
- } catch (IOException x) {
- log.error("Unable to deserialize MapMessage.", x);
- } catch (ClassNotFoundException x) {
- log.error("Unable to deserialize MapMessage.", x);
- }
- }
-
- public void messageReceived(Serializable msg, Member sender) {
- if (!(msg instanceof MapMessage))
- return;
-
- MapMessage mapmsg = (MapMessage)msg;
- if (log.isTraceEnabled()) {
- log.trace("Map[" + mapname + "] received message:" + mapmsg);
- }
-
- try {
- mapmsg.deserialize(getExternalLoaders());
- } catch (IOException x) {
- log.error("Unable to deserialize MapMessage.", x);
- return;
- } catch (ClassNotFoundException x) {
- log.error("Unable to deserialize MapMessage.", x);
- return;
- }
- if (log.isTraceEnabled())
- log.trace("Map message received from:" + sender.getName() + " msg:" + mapmsg);
- if (mapmsg.getMsgType() == MapMessage.MSG_START) {
- mapMemberAdded(mapmsg.getPrimary());
- }
-
- if (mapmsg.getMsgType() == MapMessage.MSG_STOP) {
- memberDisappeared(mapmsg.getPrimary());
- }
-
- if (mapmsg.getMsgType() == MapMessage.MSG_PROXY) {
- MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
- if (entry == null) {
- entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
- entry.setBackup(false);
- entry.setProxy(true);
- entry.setBackupNodes(mapmsg.getBackupNodes());
- entry.setPrimary(mapmsg.getPrimary());
- super.put(entry.getKey(), entry);
- } else {
- entry.setProxy(true);
- entry.setBackup(false);
- entry.setBackupNodes(mapmsg.getBackupNodes());
- entry.setPrimary(mapmsg.getPrimary());
- }
- }
-
- if (mapmsg.getMsgType() == MapMessage.MSG_REMOVE) {
- super.remove(mapmsg.getKey());
- }
-
- if (mapmsg.getMsgType() == MapMessage.MSG_BACKUP || mapmsg.getMsgType() == MapMessage.MSG_COPY) {
- MapEntry entry = (MapEntry)super.get(mapmsg.getKey());
- if (entry == null) {
- entry = new MapEntry(mapmsg.getKey(), mapmsg.getValue());
- entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
- entry.setProxy(false);
- entry.setBackupNodes(mapmsg.getBackupNodes());
- entry.setPrimary(mapmsg.getPrimary());
- if (mapmsg.getValue() != null && mapmsg.getValue() instanceof ReplicatedMapEntry) {
- ((ReplicatedMapEntry)mapmsg.getValue()).setOwner(getMapOwner());
- }
- } else {
- entry.setBackup(mapmsg.getMsgType() == MapMessage.MSG_BACKUP);
- entry.setProxy(false);
- entry.setBackupNodes(mapmsg.getBackupNodes());
- entry.setPrimary(mapmsg.getPrimary());
- if (entry.getValue() instanceof ReplicatedMapEntry) {
- ReplicatedMapEntry diff = (ReplicatedMapEntry)entry.getValue();
- if (mapmsg.isDiff()) {
- try {
- diff.lock();
- diff.applyDiff(mapmsg.getDiffValue(), 0, mapmsg.getDiffValue().length);
- } catch (Exception x) {
- log.error("Unable to apply diff to key:" + entry.getKey(), x);
- } finally {
- diff.unlock();
- }
- } else {
- if (mapmsg.getValue() != null)
- entry.setValue(mapmsg.getValue());
- ((ReplicatedMapEntry)entry.getValue()).setOwner(getMapOwner());
- } //end if
- } else if (mapmsg.getValue() instanceof ReplicatedMapEntry) {
- ReplicatedMapEntry re = (ReplicatedMapEntry)mapmsg.getValue();
- re.setOwner(getMapOwner());
- entry.setValue(re);
- } else {
- if (mapmsg.getValue() != null)
- entry.setValue(mapmsg.getValue());
- } //end if
- } //end if
- super.put(entry.getKey(), entry);
- } //end if
- }
-
- public boolean accept(Serializable msg, Member sender) {
- boolean result = false;
- if (msg instanceof MapMessage) {
- if (log.isTraceEnabled())
- log.trace("Map[" + mapname + "] accepting...." + msg);
- result = Arrays.equals(mapContextName, ((MapMessage)msg).getMapId());
- if (log.isTraceEnabled())
- log.trace("Msg[" + mapname + "] accepted[" + result + "]...." + msg);
- }
- return result;
- }
-
- public void mapMemberAdded(Member member) {
- Member self = getChannel().getLocalMember(false);
- if (member.equals(self))
- return;
- boolean memberAdded = false;
- //select a backup node if we don't have one
- synchronized (mapMembers) {
- if (!mapMembers.containsKey(member)) {
- mapMembers.put(member, new Long(System.currentTimeMillis()));
- memberAdded = true;
- }
- }
- if (memberAdded) {
- synchronized (stateMutex) {
- Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry<Object, Object> e = i.next();
- MapEntry entry = (MapEntry)super.get(e.getKey());
- if (entry == null)
- continue;
- // if (entry.isPrimary() && (entry.getBackupNodes() == null || entry.getBackupNodes().length == 0)) {
- // [rfeng] Change the behavior to replicate to all nodes
- if (entry.isPrimary() && self.equals(entry.getPrimary())) {
- try {
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
- entry.setBackupNodes(backup);
- entry.setPrimary(self);
- } catch (ChannelException x) {
- log.error("Unable to select backup node.", x);
- } //catch
- } //end if
- } //while
- } //synchronized
- }//end if
- }
-
- public boolean inSet(Member m, Member[] set) {
- if (set == null)
- return false;
- boolean result = false;
- for (int i = 0; i < set.length && (!result); i++)
- if (m.equals(set[i]))
- result = true;
- return result;
- }
-
- public Member[] excludeFromSet(Member[] mbrs, Member[] set) {
- List<Member> result = new ArrayList<Member>();
- for (int i = 0; i < set.length; i++) {
- boolean include = true;
- for (int j = 0; j < mbrs.length; j++)
- if (mbrs[j].equals(set[i]))
- include = false;
- if (include)
- result.add(set[i]);
- }
- return (Member[])result.toArray(new Member[result.size()]);
- }
-
- public void memberAdded(Member member) {
- //do nothing
- }
-
- public void memberDisappeared(Member member) {
- boolean removed = false;
- synchronized (mapMembers) {
- removed = (mapMembers.remove(member) != null);
- if (!removed) {
- if (log.isDebugEnabled())
- log.debug("Member[" + member + "] disappeared, but was not present in the map.");
- return; //the member was not part of our map.
- }
- }
-
- Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry<Object, Object> e = i.next();
- MapEntry entry = (MapEntry)super.get(e.getKey());
- if (entry == null)
- continue;
- if (entry.isPrimary() && inSet(member, entry.getBackupNodes())) {
- if (log.isDebugEnabled())
- log.debug("[1] Primary choosing a new backup");
- try {
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
- entry.setBackupNodes(backup);
- entry.setPrimary(channel.getLocalMember(false));
- } catch (ChannelException x) {
- log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
- }
- } else if (member.equals(entry.getPrimary())) {
- if (log.isDebugEnabled())
- log.debug("[2] Primary disappeared");
- entry.setPrimary(null);
- } //end if
-
- if (entry.isProxy() && entry.getPrimary() == null
- && entry.getBackupNodes() != null
- && entry.getBackupNodes().length == 1
- && entry.getBackupNodes()[0].equals(member)) {
- //remove proxies that have no backup nor primaries
- if (log.isDebugEnabled())
- log.debug("[3] Removing orphaned proxy");
- i.remove();
- } else if (entry.getPrimary() == null && entry.isBackup()
- && entry.getBackupNodes() != null
- && entry.getBackupNodes().length == 1
- && entry.getBackupNodes()[0].equals(channel.getLocalMember(false))) {
- try {
- if (log.isDebugEnabled())
- log.debug("[4] Backup becoming primary");
- entry.setPrimary(channel.getLocalMember(false));
- entry.setBackup(false);
- entry.setProxy(false);
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
- entry.setBackupNodes(backup);
- if (mapOwner != null)
- mapOwner.objectMadePrimay(entry.getKey(), entry.getValue());
-
- } catch (ChannelException x) {
- log.error("Unable to relocate[" + entry.getKey() + "] to a new backup node", x);
- }
- }
-
- } //while
- }
-
- public int getNextBackupIndex() {
- int size = mapMembers.size();
- if (mapMembers.size() == 0)
- return -1;
- int node = currentNode++;
- if (node >= size) {
- node = 0;
- currentNode = 0;
- }
- return node;
- }
-
- public Member getNextBackupNode() {
- Member[] members = getMapMembers();
- int node = getNextBackupIndex();
- if (members.length == 0 || node == -1)
- return null;
- if (node >= members.length)
- node = 0;
- return members[node];
- }
-
- protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
-
- public void heartbeat() {
- try {
- ping(accessTimeout);
- } catch (Exception x) {
- log.error("Unable to send AbstractReplicatedMap.ping message", x);
- }
- }
-
- //------------------------------------------------------------------------------
- // METHODS TO OVERRIDE
- //------------------------------------------------------------------------------
-
- /**
- * Removes an object from this map, it will also remove it from
- *
- * @param key Object
- * @return Object
- */
- public Object remove(Object key) {
- return remove(key, true);
- }
-
- public Object remove(Object key, boolean notify) {
- MapEntry entry = (MapEntry)super.remove(key);
-
- try {
- if (getMapMembers().length > 0 && notify) {
- MapMessage msg =
- new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable)key, null, null,
- null, null);
- getChannel().send(getMapMembers(), msg, getChannelSendOptions());
- }
- } catch (ChannelException x) {
- log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation", x);
- }
- return entry != null ? entry.getValue() : null;
- }
-
- public MapEntry getInternal(Object key) {
- return (MapEntry)super.get(key);
- }
-
- public Object get(Object key) {
- MapEntry entry = (MapEntry)super.get(key);
- if (log.isTraceEnabled())
- log.trace("Requesting id:" + key + " entry:" + entry);
- if (entry == null)
- return null;
- if (!entry.isPrimary()) {
- //if the message is not primary, we need to retrieve the latest value
- try {
- Member[] backup = null;
- MapMessage msg = null;
- if (!entry.isBackup()) {
- //make sure we don't retrieve from ourselves
- msg =
- new MapMessage(getMapContextName(), MapMessage.MSG_RETRIEVE_BACKUP, false, (Serializable)key,
- null, null, null, null);
- Response[] resp =
- getRpcChannel().send(entry.getBackupNodes(),
- msg,
- RpcChannel.FIRST_REPLY,
- Channel.SEND_OPTIONS_DEFAULT,
- getRpcTimeout());
- if (resp == null || resp.length == 0) {
- //no responses
- log.warn("Unable to retrieve remote object for key:" + key);
- return null;
- }
- msg = (MapMessage)resp[0].getMessage();
- msg.deserialize(getExternalLoaders());
- backup = entry.getBackupNodes();
- if (entry.getValue() instanceof ReplicatedMapEntry) {
- ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
- val.setOwner(getMapOwner());
- }
- if (msg.getValue() != null)
- entry.setValue(msg.getValue());
- }
- if (entry.isBackup()) {
- //select a new backup node
- backup = publishEntryInfo(key, entry.getValue());
- } else if (entry.isProxy()) {
- //invalidate the previous primary
- msg =
- new MapMessage(getMapContextName(), MapMessage.MSG_PROXY, false, (Serializable)key, null, null,
- channel.getLocalMember(false), backup);
- Member[] dest = getMapMembersExcl(backup);
- if (dest != null && dest.length > 0) {
- getChannel().send(dest, msg, getChannelSendOptions());
- }
- }
- entry.setPrimary(channel.getLocalMember(false));
- entry.setBackupNodes(backup);
- entry.setBackup(false);
- entry.setProxy(false);
-
- } catch (Exception x) {
- log.error("Unable to replicate out data for a LazyReplicatedMap.get operation", x);
- return null;
- }
- }
- if (log.isTraceEnabled())
- log.trace("Requesting id:" + key + " result:" + entry.getValue());
- if (entry.getValue() != null && entry.getValue() instanceof ReplicatedMapEntry) {
- ReplicatedMapEntry val = (ReplicatedMapEntry)entry.getValue();
- //hack, somehow this is not being set above
- val.setOwner(getMapOwner());
-
- }
- return entry.getValue();
- }
-
- protected void printMap(String header) {
- try {
- System.out.println("\nDEBUG MAP:" + header);
- System.out.println("Map[" + new String(mapContextName, chset) + ", Map Size:" + super.size());
- Member[] mbrs = getMapMembers();
- for (int i = 0; i < mbrs.length; i++) {
- System.out.println("Mbr[" + (i + 1) + "=" + mbrs[i].getName());
- }
- Iterator i = super.entrySet().iterator();
- int cnt = 0;
-
- while (i.hasNext()) {
- Map.Entry e = (Map.Entry)i.next();
- System.out.println((++cnt) + ". " + super.get(e.getKey()));
- }
- System.out.println("EndMap]\n\n");
- } catch (Exception ignore) {
- ignore.printStackTrace();
- }
- }
-
- /**
- * Returns true if the key has an entry in the map.
- * The entry can be a proxy or a backup entry, invoking <code>get(key)</code>
- * will make this entry primary for the group
- * @param key Object
- * @return boolean
- */
- public boolean containsKey(Object key) {
- return super.containsKey(key);
- }
-
- public Object put(Object key, Object value) {
- return put(key, value, true);
- }
-
- public Object put(Object key, Object value, boolean notify) {
- MapEntry entry = new MapEntry(key, value);
- entry.setBackup(false);
- entry.setProxy(false);
- entry.setPrimary(channel.getLocalMember(false));
-
- Object old = null;
-
- //make sure that any old values get removed
- if (containsKey(key))
- old = remove(key);
- try {
- if (notify) {
- Member[] backup = publishEntryInfo(key, value);
- entry.setBackupNodes(backup);
- }
- } catch (ChannelException x) {
- log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
- }
- super.put(key, entry);
- return old;
- }
-
- /**
- * Copies all values from one map to this instance
- * @param m Map
- */
- public void putAll(Map m) {
- Iterator i = m.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry entry = (Map.Entry)i.next();
- put(entry.getKey(), entry.getValue());
- }
- }
-
- public void clear() {
- clear(true);
- }
-
- public void clear(boolean notify) {
- if (notify) {
- //only delete active keys
- Iterator<Object> keys = keySet().iterator();
- while (keys.hasNext())
- remove(keys.next());
- } else {
- super.clear();
- }
- }
-
- public boolean containsValue(Object value) {
- if (value == null) {
- return super.containsValue(value);
- } else {
- Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry<Object, Object> e = i.next();
- MapEntry entry = (MapEntry)super.get(e.getKey());
- if (entry != null && entry.isPrimary() && value.equals(entry.getValue()))
- return true;
- }//while
- return false;
- }//end if
- }
-
- public Object clone() {
- throw new UnsupportedOperationException("This operation is not valid on a replicated map");
- }
-
- /**
- * Returns the entire contents of the map
- * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information
- * about the object.
- * @return Set
- */
- public Set<Map.Entry<Object, Object>> entrySetFull() {
- return super.entrySet();
- }
-
- public Set<Object> keySetFull() {
- return super.keySet();
- }
-
- public int sizeFull() {
- return super.size();
- }
-
- public Set<Map.Entry<Object, Object>> entrySet() {
- Set<Map.Entry<Object, Object>> set = new LinkedHashSet<Map.Entry<Object, Object>>(super.size());
- Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry<Object, Object> e = i.next();
- Object key = e.getKey();
- MapEntry entry = (MapEntry)super.get(key);
- if (entry != null && entry.isPrimary()) {
- set.add(new MapEntry(key, entry.getValue()));
- }
- }
- return Collections.unmodifiableSet(set);
- }
-
- public Set<Object> keySet() {
- //todo implement
- //should only return keys where this is active.
- LinkedHashSet<Object> set = new LinkedHashSet<Object>(super.size());
- Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry<Object, Object> e = i.next();
- Object key = e.getKey();
- MapEntry entry = (MapEntry)super.get(key);
- if (entry != null && entry.isPrimary())
- set.add(key);
- }
- return Collections.unmodifiableSet(set);
-
- }
-
- public int size() {
- //todo, implement a counter variable instead
- //only count active members in this node
- int counter = 0;
- Iterator<Map.Entry<Object, Object>> it = super.entrySet().iterator();
- while (it != null && it.hasNext()) {
- Map.Entry<Object, Object> e = it.next();
- if (e != null) {
- MapEntry entry = (MapEntry)super.get(e.getKey());
- if (entry != null && entry.isPrimary() && entry.getValue() != null)
- counter++;
- }
- }
- return counter;
- }
-
- protected boolean removeEldestEntry(Map.Entry<Object, Object> eldest) {
- return false;
- }
-
- public boolean isEmpty() {
- return size() == 0;
- }
-
- public Collection<Object> values() {
- ArrayList<Object> values = new ArrayList<Object>();
- Iterator<Map.Entry<Object, Object>> i = super.entrySet().iterator();
- while (i.hasNext()) {
- Map.Entry<Object, Object> e = i.next();
- MapEntry entry = (MapEntry)super.get(e.getKey());
- if (entry != null && entry.isPrimary() && entry.getValue() != null)
- values.add(entry.getValue());
- }
- return Collections.unmodifiableCollection(values);
- }
-
- //------------------------------------------------------------------------------
- // Map Entry class
- //------------------------------------------------------------------------------
- public static class MapEntry implements Map.Entry<Object, Object> {
- private boolean backup;
- private boolean proxy;
- private Member[] backupNodes;
- private Member primary;
- private Object key;
- private Object value;
-
- public MapEntry(Object key, Object value) {
- setKey(key);
- setValue(value);
-
- }
-
- public boolean isKeySerializable() {
- return (key == null) || (key instanceof Serializable);
- }
-
- public boolean isValueSerializable() {
- return (value == null) || (value instanceof Serializable);
- }
-
- public boolean isSerializable() {
- return isKeySerializable() && isValueSerializable();
- }
-
- public boolean isBackup() {
- return backup;
- }
-
- public void setBackup(boolean backup) {
- this.backup = backup;
- }
-
- public boolean isProxy() {
- return proxy;
- }
-
- public boolean isPrimary() {
- return ((!proxy) && (!backup));
- }
-
- public void setProxy(boolean proxy) {
- this.proxy = proxy;
- }
-
- public boolean isDiffable() {
- return (value instanceof ReplicatedMapEntry) && ((ReplicatedMapEntry)value).isDiffable();
- }
-
- public void setBackupNodes(Member[] nodes) {
- this.backupNodes = nodes;
- }
-
- public Member[] getBackupNodes() {
- return backupNodes;
- }
-
- public void setPrimary(Member m) {
- primary = m;
- }
-
- public Member getPrimary() {
- return primary;
- }
-
- public Object getValue() {
- return value;
- }
-
- public Object setValue(Object value) {
- Object old = this.value;
- this.value = value;
- return old;
- }
-
- public Object getKey() {
- return key;
- }
-
- public Object setKey(Object key) {
- Object old = this.key;
- this.key = key;
- return old;
- }
-
- public int hashCode() {
- return key.hashCode();
- }
-
- public boolean equals(Object o) {
- return key.equals(o);
- }
-
- /**
- * apply a diff, or an entire object
- * @param data byte[]
- * @param offset int
- * @param length int
- * @param diff boolean
- * @throws IOException
- * @throws ClassNotFoundException
- */
- public void apply(byte[] data, int offset, int length, boolean diff) throws IOException, ClassNotFoundException {
- if (isDiffable() && diff) {
- ReplicatedMapEntry rentry = (ReplicatedMapEntry)value;
- try {
- rentry.lock();
- rentry.applyDiff(data, offset, length);
- } finally {
- rentry.unlock();
- }
- } else if (length == 0) {
- value = null;
- proxy = true;
- } else {
- value = XByteBuffer.deserialize(data, offset, length);
- }
- }
-
- public String toString() {
- StringBuffer buf = new StringBuffer("MapEntry[key:");
- buf.append(getKey()).append("; ");
- buf.append("value:").append(getValue()).append("; ");
- buf.append("primary:").append(isPrimary()).append("; ");
- buf.append("backup:").append(isBackup()).append("; ");
- buf.append("proxy:").append(isProxy()).append(";]");
- return buf.toString();
- }
-
- }
-
- //------------------------------------------------------------------------------
- // map message to send to and from other maps
- //------------------------------------------------------------------------------
-
- public static class MapMessage implements Serializable {
- private static final long serialVersionUID = -7847288807489375686L;
- public static final int MSG_BACKUP = 1;
- public static final int MSG_RETRIEVE_BACKUP = 2;
- public static final int MSG_PROXY = 3;
- public static final int MSG_REMOVE = 4;
- public static final int MSG_STATE = 5;
- public static final int MSG_START = 6;
- public static final int MSG_STOP = 7;
- public static final int MSG_INIT = 8;
- public static final int MSG_COPY = 9;
- public static final int MSG_STATE_COPY = 10;
-
- private byte[] mapId;
- private int msgtype;
- private boolean diff;
- private transient Serializable key;
- private transient Serializable value;
- private byte[] valuedata;
- private byte[] keydata;
- private byte[] diffvalue;
- private Member[] nodes;
- private Member primary;
-
- public String toString() {
- StringBuffer buf = new StringBuffer("MapMessage[context=");
- buf.append(new String(mapId));
- buf.append("; type=");
- buf.append(getTypeDesc());
- buf.append("; key=");
- buf.append(key);
- buf.append("; value=");
- buf.append(value);
- return buf.toString();
- }
-
- public String getTypeDesc() {
- switch (msgtype) {
- case MSG_BACKUP:
- return "MSG_BACKUP";
- case MSG_RETRIEVE_BACKUP:
- return "MSG_RETRIEVE_BACKUP";
- case MSG_PROXY:
- return "MSG_PROXY";
- case MSG_REMOVE:
- return "MSG_REMOVE";
- case MSG_STATE:
- return "MSG_STATE";
- case MSG_START:
- return "MSG_START";
- case MSG_STOP:
- return "MSG_STOP";
- case MSG_INIT:
- return "MSG_INIT";
- case MSG_STATE_COPY:
- return "MSG_STATE_COPY";
- case MSG_COPY:
- return "MSG_COPY";
- default:
- return "UNKNOWN";
- }
- }
-
- public MapMessage() {
- }
-
- public MapMessage(byte[] mapId,
- int msgtype,
- boolean diff,
- Serializable key,
- Serializable value,
- byte[] diffvalue,
- Member primary,
- Member[] nodes) {
- this.mapId = mapId;
- this.msgtype = msgtype;
- this.diff = diff;
- this.key = key;
- this.value = value;
- this.diffvalue = diffvalue;
- this.nodes = nodes;
- this.primary = primary;
- setValue(value);
- setKey(key);
- }
-
- public void deserialize(ClassLoader[] cls) throws IOException, ClassNotFoundException {
- key(cls);
- value(cls);
- }
-
- public int getMsgType() {
- return msgtype;
- }
-
- public boolean isDiff() {
- return diff;
- }
-
- public Serializable getKey() {
- try {
- return key(null);
- } catch (Exception x) {
- log.error("Deserialization error of the MapMessage.key", x);
- return null;
- }
- }
-
- public Serializable key(ClassLoader[] cls) throws IOException, ClassNotFoundException {
- if (key != null)
- return key;
- if (keydata == null || keydata.length == 0)
- return null;
- key = XByteBuffer.deserialize(keydata, 0, keydata.length, cls);
- keydata = null;
- return key;
- }
-
- public byte[] getKeyData() {
- return keydata;
- }
-
- public Serializable getValue() {
- try {
- return value(null);
- } catch (Exception x) {
- log.error("Deserialization error of the MapMessage.value", x);
- return null;
- }
- }
-
- public Serializable value(ClassLoader[] cls) throws IOException, ClassNotFoundException {
- if (value != null)
- return value;
- if (valuedata == null || valuedata.length == 0)
- return null;
- value = XByteBuffer.deserialize(valuedata, 0, valuedata.length, cls);
- valuedata = null;
- ;
- return value;
- }
-
- public byte[] getValueData() {
- return valuedata;
- }
-
- public byte[] getDiffValue() {
- return diffvalue;
- }
-
- public Member[] getBackupNodes() {
- return nodes;
- }
-
- private void setBackUpNodes(Member[] nodes) {
- this.nodes = nodes;
- }
-
- public Member getPrimary() {
- return primary;
- }
-
- private void setPrimary(Member m) {
- primary = m;
- }
-
- public byte[] getMapId() {
- return mapId;
- }
-
- public void setValue(Serializable value) {
- try {
- if (value != null)
- valuedata = XByteBuffer.serialize(value);
- this.value = value;
- } catch (IOException x) {
- throw new RuntimeException(x);
- }
- }
-
- public void setKey(Serializable key) {
- try {
- if (key != null)
- keydata = XByteBuffer.serialize(key);
- this.key = key;
- } catch (IOException x) {
- throw new RuntimeException(x);
- }
- }
-
- protected Member[] readMembers(ObjectInput in) throws IOException, ClassNotFoundException {
- int nodecount = in.readInt();
- Member[] members = new Member[nodecount];
- for (int i = 0; i < members.length; i++) {
- byte[] d = new byte[in.readInt()];
- in.read(d);
- if (d.length > 0)
- members[i] = MemberImpl.getMember(d);
- }
- return members;
- }
-
- protected void writeMembers(ObjectOutput out, Member[] members) throws IOException {
- if (members == null)
- members = new Member[0];
- out.writeInt(members.length);
- for (int i = 0; i < members.length; i++) {
- if (members[i] != null) {
- byte[] d = members[i] != null ? ((MemberImpl)members[i]).getData(false) : new byte[0];
- out.writeInt(d.length);
- out.write(d);
- }
- }
- }
-
- /**
- * shallow clone
- * @return Object
- */
- public Object clone() {
- MapMessage msg =
- new MapMessage(this.mapId, this.msgtype, this.diff, this.key, this.value, this.diffvalue, this.primary,
- this.nodes);
- msg.keydata = this.keydata;
- msg.valuedata = this.valuedata;
- return msg;
- }
- } //MapMessage
-
- public Channel getChannel() {
- return channel;
- }
-
- public byte[] getMapContextName() {
- return mapContextName;
- }
-
- public RpcChannel getRpcChannel() {
- return rpcChannel;
- }
-
- public long getRpcTimeout() {
- return rpcTimeout;
- }
-
- public Object getStateMutex() {
- return stateMutex;
- }
-
- public boolean isStateTransferred() {
- return stateTransferred;
- }
-
- public MapOwner getMapOwner() {
- return mapOwner;
- }
-
- public ClassLoader[] getExternalLoaders() {
- return externalLoaders;
- }
-
- public int getChannelSendOptions() {
- return channelSendOptions;
- }
-
- public long getAccessTimeout() {
- return accessTimeout;
- }
-
- public void setMapOwner(MapOwner mapOwner) {
- this.mapOwner = mapOwner;
- }
-
- public void setExternalLoaders(ClassLoader[] externalLoaders) {
- this.externalLoaders = externalLoaders;
- }
-
- public void setChannelSendOptions(int channelSendOptions) {
- this.channelSendOptions = channelSendOptions;
- }
-
- public void setAccessTimeout(long accessTimeout) {
- this.accessTimeout = accessTimeout;
- }
-
-}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java
deleted file mode 100644
index 1e4d3af50d..0000000000
--- a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/DisableMcastInterceptor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.endpoint.tribes;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-
-/**
- * A static interceptor to disables multicast.
- * Can be removed when/if the function gets added to Tribes.
- * See Tomcat email http://markmail.org/message/doqu7pfl2hvvdfcl
- */
-public class DisableMcastInterceptor extends ChannelInterceptorBase {
-
- public DisableMcastInterceptor() {
- super();
- }
-
- public void start(int svc) throws ChannelException {
- svc = (svc & (~Channel.MBR_TX_SEQ));
- super.start(svc);
- }
-}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java
deleted file mode 100644
index 9e540743bf..0000000000
--- a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/MapStore.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.endpoint.tribes;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-/**
- * The Map that can fire events on put/remove of entries
- */
-public abstract class MapStore extends ConcurrentHashMap<Object, Object> {
- private static final long serialVersionUID = -2127235547082144368L;
- private List<MapListener> listeners = new CopyOnWriteArrayList<MapListener>();
-
- protected MapStore(int initialCapacity, float loadFactor, int concurrencyLevel) {
- super(initialCapacity, loadFactor, concurrencyLevel);
- }
-
- @Override
- public Object put(Object key, Object value) {
- Object old = super.put(key, value);
- if (old != null) {
- for (MapListener listener : listeners) {
- listener.entryUpdated(key, old, value);
- }
- } else {
- for (MapListener listener : listeners) {
- listener.entryAdded(key, value);
- }
-
- }
- return old;
- }
-
- @Override
- public Object remove(Object key) {
- Object old = super.remove(key);
- if (old != null) {
- for (MapListener listener : listeners) {
- listener.entryRemoved(key, old);
- }
- }
- return old;
- }
-
- public void addListener(MapListener listener) {
- listeners.add(listener);
- }
-
- public List<MapListener> getListeners() {
- return listeners;
- }
-
- public boolean removeListener(MapListener listener) {
- return listeners.remove(listener);
- }
-
- public static interface MapListener {
- void entryAdded(Object key, Object value);
-
- void entryUpdated(Object key, Object oldValue, Object newValue);
-
- void entryRemoved(Object key, Object value);
- }
-}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
deleted file mode 100644
index 5dc627d88d..0000000000
--- a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
+++ /dev/null
@@ -1,420 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.endpoint.tribes;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.URI;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelReceiver;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.GroupChannel;
-import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
-import org.apache.catalina.tribes.membership.McastService;
-import org.apache.catalina.tribes.membership.StaticMember;
-import org.apache.catalina.tribes.transport.ReceiverBase;
-import org.apache.tuscany.sca.assembly.Endpoint;
-import org.apache.tuscany.sca.core.ExtensionPointRegistry;
-import org.apache.tuscany.sca.core.LifeCycleListener;
-import org.apache.tuscany.sca.endpoint.tribes.AbstractReplicatedMap.MapEntry;
-import org.apache.tuscany.sca.endpoint.tribes.MapStore.MapListener;
-import org.apache.tuscany.sca.runtime.BaseEndpointRegistry;
-import org.apache.tuscany.sca.runtime.DomainRegistryURI;
-import org.apache.tuscany.sca.runtime.EndpointRegistry;
-import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
-
-/**
- * A replicated EndpointRegistry based on Apache Tomcat Tribes
- */
-public class ReplicatedEndpointRegistry extends BaseEndpointRegistry implements EndpointRegistry, LifeCycleListener,
- MapListener {
- private final static Logger logger = Logger.getLogger(ReplicatedEndpointRegistry.class.getName());
- private static final String MULTICAST_ADDRESS = "228.0.0.100";
- private static final int MULTICAST_PORT = 50000;
-
- private static final int FIND_REPEAT_COUNT = 10;
-
- private int port = MULTICAST_PORT;
- private String address = MULTICAST_ADDRESS;
- private String bind = null;
- private int timeout = 50;
- private String receiverAddress;
- private int receiverPort = 4000;
- private int receiverAutoBind = 100;
- private List<URI> staticRoutes;
-
- private ReplicatedMap map;
-
- private String id;
- private boolean noMultiCast;
-
- private static final GroupChannel createChannel(String address, int port, String bindAddress) {
-
- //create a channel
- GroupChannel channel = new GroupChannel();
- McastService mcastService = (McastService)channel.getMembershipService();
- mcastService.setPort(port);
- mcastService.setAddress(address);
-
- // REVIEW: In my case, there are multiple IP addresses
- // One for the WIFI and the other one for VPN. For some reason the VPN one doesn't support
- // Multicast
-
- if (bindAddress != null) {
- mcastService.setBind(bindAddress);
- } else {
- mcastService.setBind(getBindAddress());
- }
-
- return channel;
- }
-
- public ReplicatedEndpointRegistry(ExtensionPointRegistry registry,
- Map<String, String> attributes,
- String domainRegistryURI,
- String domainURI) {
- super(registry, attributes, domainRegistryURI, domainURI);
- getParameters(attributes, domainRegistryURI);
- }
-
- private Map<String, String> getParameters(Map<String, String> attributes, String domainRegistryURI) {
- Map<String, String> map = new HashMap<String, String>();
- if (attributes != null) {
- map.putAll(attributes);
- }
- URI uri = URI.create(domainRegistryURI);
- if (uri.getHost() != null) {
- map.put("address", uri.getHost());
- }
- if (uri.getPort() != -1) {
- map.put("port", String.valueOf(uri.getPort()));
- }
-
- if (domainRegistryURI.startsWith("tuscany")) {
- setTuscanyConfig(map, domainRegistryURI);
- setConfig(map);
- return map;
- }
-
- int index = domainRegistryURI.indexOf('?');
- if (index == -1) {
- setConfig(map);
- return map;
- }
- String query = domainRegistryURI.substring(index + 1);
- try {
- query = URLDecoder.decode(query, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new IllegalArgumentException(e);
- }
- String[] params = query.split("&");
- for (String param : params) {
- index = param.indexOf('=');
- if (index != -1) {
- map.put(param.substring(0, index), param.substring(index + 1));
- }
- }
- setConfig(map);
- return map;
- }
-
- private void setTuscanyConfig(Map<String, String> map, String domainRegistryURI) {
- DomainRegistryURI tuscanyURI = new DomainRegistryURI(domainRegistryURI);
- map.put("address", tuscanyURI.getMulticastAddress());
- map.put("port", Integer.toString(tuscanyURI.getMulticastPort()));
- map.put("bind", tuscanyURI.getBindAddress());
- map.put("receiverPort", Integer.toString(tuscanyURI.getListenPort()));
- if (tuscanyURI.isMulticastDisabled()) {
- map.put("nomcast", "true");
- }
- if (tuscanyURI.getRemotes().size() > 0) {
- String routes = "";
- for (int i=0; i<tuscanyURI.getRemotes().size(); i++) {
- routes += tuscanyURI.getRemotes().get(i);
- if (i < tuscanyURI.getRemotes().size()) {
- routes += ",";
- }
- }
- map.put("routes", routes);
- }
- }
-
- private void setConfig(Map<String, String> attributes) {
- String portStr = attributes.get("port");
- if (portStr != null) {
- port = Integer.parseInt(portStr);
- if (port == -1) {
- port = MULTICAST_PORT;
- }
- }
- String address = attributes.get("address");
- if (address == null) {
- address = MULTICAST_ADDRESS;
- }
- bind = attributes.get("bind");
- String timeoutStr = attributes.get("timeout");
- if (timeoutStr != null) {
- timeout = Integer.parseInt(timeoutStr);
- }
-
- String routesStr = attributes.get("routes");
- if (routesStr != null) {
- StringTokenizer st = new StringTokenizer(routesStr);
- staticRoutes = new ArrayList<URI>();
- while (st.hasMoreElements()) {
- staticRoutes.add(URI.create("tcp://" + st.nextToken()));
- }
- }
- String mcast = attributes.get("nomcast");
- if (mcast != null) {
- noMultiCast = Boolean.valueOf(mcast);
- }
- receiverAddress = attributes.get("receiverAddress");
- String recvPort = attributes.get("receiverPort");
- if (recvPort != null) {
- receiverPort = Integer.parseInt(recvPort);
- }
- String recvAutoBind = attributes.get("receiverAutoBind");
- if (recvAutoBind != null) {
- receiverAutoBind = Integer.parseInt(recvAutoBind);
- }
- }
-
- public void start() {
- if (map != null) {
- throw new IllegalStateException("The registry has already been started");
- }
- GroupChannel channel = createChannel(address, port, bind);
- map =
- new ReplicatedMap(null, channel, timeout, this.domainURI,
- new ClassLoader[] {ReplicatedEndpointRegistry.class.getClassLoader()});
- map.addListener(this);
-
- if (noMultiCast) {
- map.getChannel().addInterceptor(new DisableMcastInterceptor());
- }
-
- // Configure the receiver ports
- ChannelReceiver receiver = channel.getChannelReceiver();
- if (receiver instanceof ReceiverBase) {
- if (receiverAddress != null) {
- ((ReceiverBase)receiver).setAddress(receiverAddress);
- }
- ((ReceiverBase)receiver).setPort(receiverPort);
- ((ReceiverBase)receiver).setAutoBind(receiverAutoBind);
- }
-
- /*
- Object sender = channel.getChannelSender();
- if (sender instanceof ReplicationTransmitter) {
- sender = ((ReplicationTransmitter)sender).getTransport();
- }
- if (sender instanceof AbstractSender) {
- ((AbstractSender)sender).setKeepAliveCount(0);
- ((AbstractSender)sender).setMaxRetryAttempts(5);
- }
- */
-
- if (staticRoutes != null) {
- StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
- for (URI staticRoute : staticRoutes) {
- Member member;
- try {
- // The port has to match the receiver port
- member = new StaticMember(staticRoute.getHost(), staticRoute.getPort(), 5000);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- smi.addStaticMember(member);
- logger.info("Added static route: " + staticRoute.getHost() + ":" + staticRoute.getPort());
- }
- smi.setLocalMember(map.getChannel().getLocalMember(false));
- map.getChannel().addInterceptor(smi);
- }
-
- try {
- map.getChannel().start(Channel.DEFAULT);
- } catch (ChannelException e) {
- throw new IllegalStateException(e);
- }
-
- }
-
- public void stop() {
- if (map != null) {
- map.removeListener(this);
- Channel channel = map.getChannel();
- map.breakdown();
- try {
- channel.stop(Channel.DEFAULT);
- } catch (ChannelException e) {
- logger.log(Level.WARNING, e.getMessage(), e);
- }
- map = null;
- }
- }
-
- public void addEndpoint(Endpoint endpoint) {
- map.put(endpoint.getURI(), endpoint);
- logger.info("Add endpoint - " + endpoint);
- }
-
- public List<Endpoint> findEndpoint(String uri) {
- List<Endpoint> foundEndpoints = new ArrayList<Endpoint>();
-
- // in the failure case we repeat the look up after a short
- // delay to take account of tribes replication delays
- int repeat = FIND_REPEAT_COUNT;
-
- while (repeat > 0) {
- for (Object v : map.values()) {
- Endpoint endpoint = (Endpoint)v;
- // TODO: implement more complete matching
- logger.fine("Matching against - " + endpoint);
- if (endpoint.matches(uri)) {
- MapEntry entry = map.getInternal(endpoint.getURI());
- // if (!entry.isPrimary()) {
- ((RuntimeEndpoint)endpoint).bind(registry, this);
- // }
- foundEndpoints.add(endpoint);
- logger.fine("Found endpoint with matching service - " + endpoint);
- repeat = 0;
- }
- // else the service name doesn't match
- }
-
- if (foundEndpoints.size() == 0) {
- // the service name doesn't match any endpoints so wait a little and try
- // again in case this is caused by tribes synch delays
- logger.info("Repeating endpoint reference match - " + uri);
- repeat--;
- try {
- Thread.sleep(1000);
- } catch (Exception ex) {
- // do nothing
- repeat = 0;
- }
- }
- }
-
- return foundEndpoints;
- }
-
- private boolean isLocal(MapEntry entry) {
- return entry.getPrimary().equals(map.getChannel().getLocalMember(false));
- }
-
- public Endpoint getEndpoint(String uri) {
- return (Endpoint)map.get(uri);
- }
-
- public List<Endpoint> getEndpoints() {
- return new ArrayList(map.values());
- }
-
- public void removeEndpoint(Endpoint endpoint) {
- map.remove(endpoint.getURI());
- logger.info("Remove endpoint - " + endpoint);
- }
-
- public void replicate(boolean complete) {
- map.replicate(complete);
- }
-
- public void updateEndpoint(String uri, Endpoint endpoint) {
- Endpoint oldEndpoint = getEndpoint(uri);
- if (oldEndpoint == null) {
- throw new IllegalArgumentException("Endpoint is not found: " + uri);
- }
- map.put(endpoint.getURI(), endpoint);
- }
-
- public void entryAdded(Object key, Object value) {
- MapEntry entry = (MapEntry)value;
- Endpoint newEp = (Endpoint)entry.getValue();
- if (!isLocal(entry)) {
- logger.info(id + " Remote endpoint added: " + entry.getValue());
- }
- endpointAdded(newEp);
- }
-
- public void entryRemoved(Object key, Object value) {
- MapEntry entry = (MapEntry)value;
- if (!isLocal(entry)) {
- logger.info(id + " Remote endpoint removed: " + entry.getValue());
- }
- endpointRemoved((Endpoint)entry.getValue());
- }
-
- public void entryUpdated(Object key, Object oldValue, Object newValue) {
- MapEntry oldEntry = (MapEntry)oldValue;
- MapEntry newEntry = (MapEntry)newValue;
- if (!isLocal(newEntry)) {
- logger.info(id + " Remote endpoint updated: " + newEntry.getValue());
- }
- Endpoint oldEp = (Endpoint)oldEntry.getValue();
- Endpoint newEp = (Endpoint)newEntry.getValue();
- endpointUpdated(oldEp, newEp);
- }
-
- private static String getBindAddress() {
- try {
- Enumeration<NetworkInterface> nis = NetworkInterface.getNetworkInterfaces();
- while (nis.hasMoreElements()) {
- NetworkInterface ni = nis.nextElement();
- // The following APIs require JDK 1.6
- /*
- if (ni.isLoopback() || !ni.isUp() || !ni.supportsMulticast()) {
- continue;
- }
- */
- Enumeration<InetAddress> ips = ni.getInetAddresses();
- if (!ips.hasMoreElements()) {
- continue;
- }
- while (ips.hasMoreElements()) {
- InetAddress addr = ips.nextElement();
- if (addr.isLoopbackAddress()) {
- continue;
- }
- return addr.getHostAddress();
- }
- }
- return InetAddress.getLocalHost().getHostAddress();
- } catch (Exception e) {
- logger.log(Level.SEVERE, e.getMessage(), e);
- return null;
- }
- }
-
-}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
deleted file mode 100644
index 669ad82192..0000000000
--- a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tuscany.sca.endpoint.tribes;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.catalina.tribes.Channel;
-import org.apache.catalina.tribes.ChannelException;
-import org.apache.catalina.tribes.ChannelListener;
-import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.group.RpcCallback;
-
-/**
- * This file is copied from:
- * https://svn.apache.org/repos/asf/tomcat/tc6.0.x/tags/TOMCAT_6_0_20/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
- *
- */
-public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, ChannelListener, MembershipListener {
- private static final long serialVersionUID = -6318779627600581121L;
- protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ReplicatedMap.class);
-
- //------------------------------------------------------------------------------
- // CONSTRUCTORS / DESTRUCTORS
- //------------------------------------------------------------------------------
- /**
- * Creates a new map
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messags
- * @param mapContextName String - unique name for this map, to allow multiple maps per channel
- * @param initialCapacity int - the size of this map, see HashMap
- * @param loadFactor float - load factor, see HashMap
- */
- public ReplicatedMap(MapOwner owner,
- Channel channel,
- long timeout,
- String mapContextName,
- int initialCapacity,
- float loadFactor,
- ClassLoader[] cls) {
- super(owner, channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls);
- }
-
- /**
- * Creates a new map
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messags
- * @param mapContextName String - unique name for this map, to allow multiple maps per channel
- * @param initialCapacity int - the size of this map, see HashMap
- */
- public ReplicatedMap(MapOwner owner,
- Channel channel,
- long timeout,
- String mapContextName,
- int initialCapacity,
- ClassLoader[] cls) {
- super(owner, channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,
- Channel.SEND_OPTIONS_DEFAULT, cls);
- }
-
- /**
- * Creates a new map
- * @param channel The channel to use for communication
- * @param timeout long - timeout for RPC messags
- * @param mapContextName String - unique name for this map, to allow multiple maps per channel
- */
- public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) {
- super(owner, channel, timeout, mapContextName, AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY,
- AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls);
- }
-
- //------------------------------------------------------------------------------
- // METHODS TO OVERRIDE
- //------------------------------------------------------------------------------
- protected int getStateMessageType() {
- return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY;
- }
-
- /**
- * publish info about a map pair (key/value) to other nodes in the cluster
- * @param key Object
- * @param value Object
- * @return Member - the backup node
- * @throws ChannelException
- */
- protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
- if (!(key instanceof Serializable && value instanceof Serializable))
- return new Member[0];
- //select a backup node
- Member[] members = getMapMembers();
-
- if (members == null || members.length == 0) {
- return new Member[0];
- }
-
- //publish the data out to all nodes
- MapMessage msg =
- new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value,
- null, channel.getLocalMember(false), members);
-
- getChannel().send(members, msg, getChannelSendOptions());
-
- return members;
- }
-
- /**
- * Override the base method to look up existing entries only
- */
- public Object get(Object key) {
- MapEntry entry = super.getInternal(key);
- if (log.isTraceEnabled())
- log.trace("Requesting id:" + key + " entry:" + entry);
- if (entry == null) {
- return null;
- }
- return entry.getValue();
- }
-
- /**
- * Override the base method to remove all entries owned by the member that disappeared
- */
- public void memberDisappeared(Member member) {
- boolean removed = false;
- synchronized (mapMembers) {
- removed = (mapMembers.remove(member) != null);
- if (!removed) {
- if (log.isDebugEnabled())
- log.debug("Member[" + member + "] disappeared, but was not present in the map.");
- return; //the member was not part of our map.
- }
- }
-
- Iterator<Map.Entry<Object, Object>> i = super.entrySetFull().iterator();
- while (i.hasNext()) {
- Map.Entry<Object, Object> e = i.next();
- MapEntry entry = (MapEntry)super.getInternal(e.getKey());
- if (entry == null) {
- continue;
- }
- if (member.equals(entry.getPrimary())) {
- if (log.isDebugEnabled())
- log.debug("[2] Primary disappeared");
- i.remove();
- } //end if
- } //while
- }
-}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java
deleted file mode 100644
index 91314b4c37..0000000000
--- a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/TribesDomainRegistryFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.endpoint.tribes;
-
-import org.apache.tuscany.sca.core.ExtensionPointRegistry;
-import org.apache.tuscany.sca.runtime.BaseDomainRegistryFactory;
-import org.apache.tuscany.sca.runtime.EndpointRegistry;
-
-/**
- * The utility responsible for finding the endpoint regstry by the scheme and creating instances for the
- * given domain
- */
-public class TribesDomainRegistryFactory extends BaseDomainRegistryFactory {
- private final static String[] schemes = new String[] {"multicast", "tribes", "tuscany"};
-
- /**
- * @param extensionRegistry
- */
- public TribesDomainRegistryFactory(ExtensionPointRegistry registry) {
- super(registry);
- }
-
- protected EndpointRegistry createEndpointRegistry(String endpointRegistryURI, String domainURI) {
- EndpointRegistry endpointRegistry =
- new ReplicatedEndpointRegistry(registry, null, endpointRegistryURI, domainURI);
- return endpointRegistry;
- }
-
- public String[] getSupportedSchemes() {
- return schemes;
- }
-}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory
deleted file mode 100644
index 734afb2ac5..0000000000
--- a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory
+++ /dev/null
@@ -1,17 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-org.apache.tuscany.sca.endpoint.tribes.TribesDomainRegistryFactory;ranking=50 \ No newline at end of file
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java
deleted file mode 100644
index c28a303070..0000000000
--- a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.endpoint.tribes;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.tuscany.sca.assembly.AssemblyFactory;
-import org.apache.tuscany.sca.assembly.Binding;
-import org.apache.tuscany.sca.assembly.Component;
-import org.apache.tuscany.sca.assembly.Endpoint;
-import org.apache.tuscany.sca.assembly.SCABindingFactory;
-import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
-import org.apache.tuscany.sca.core.ExtensionPointRegistry;
-import org.apache.tuscany.sca.core.FactoryExtensionPoint;
-import org.apache.tuscany.sca.runtime.EndpointListener;
-import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-// Ignore so its not run in the build yet till its working
-@Ignore("TUSCANY-3718")
-public class MultiRegTestCase implements EndpointListener {
- private static ExtensionPointRegistry extensionPoints;
- private static AssemblyFactory assemblyFactory;
- private static SCABindingFactory scaBindingFactory;
-
- @BeforeClass
- public static void init() {
- extensionPoints = new DefaultExtensionPointRegistry();
- FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
- assemblyFactory = factories.getFactory(AssemblyFactory.class);
- scaBindingFactory = factories.getFactory(SCABindingFactory.class);
- }
-
- @Test
- public void testReplication() throws Exception {
- RuntimeEndpoint ep1 = createEndpoint("ep1uri");
-
- // String host = InetAddress.getLocalHost().getHostAddress();
- String bind = "127.0.0.1"; // "9.65.158.31";
- String port1 = "8085";
- String port2 = "8086";
- String port3 = "8087";
- String range = "1";
-
- Map<String, String> attrs1 = new HashMap<String, String>();
- // attrs1.put("nomcast", "true");
- attrs1.put("bind", bind);
- attrs1.put("receiverPort", port1);
- attrs1.put("receiverAutoBind", range);
- // attrs1.put("routes", host + ":" + port2 + " " + host + ":" + port3);
- ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, attrs1, "foo", "bar");
- reg1.addListener(this);
- reg1.start();
-
- Map<String, String> attrs2 = new HashMap<String, String>();
- // attrs2.put("nomcast", "true");
- attrs2.put("bind", bind);
- attrs2.put("receiverPort", port2);
- attrs2.put("receiverAutoBind", range);
- // attrs2.put("routes", host + ":"+port1);
- ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, attrs2, "foo", "bar");
- reg2.addListener(this);
- reg2.start();
-
- Map<String, String> attrs3 = new HashMap<String, String>();
- // attrs3.put("nomcast", "true");
- attrs3.put("bind", bind);
- attrs3.put("receiverPort", port3);
- attrs3.put("receiverAutoBind", range);
- // attrs3.put("routes", host + ":"+port1);
- ReplicatedEndpointRegistry reg3 = new ReplicatedEndpointRegistry(extensionPoints, attrs3, "foo", "bar");
- reg3.addListener(this);
- reg3.start();
-
- ep1.bind(extensionPoints, reg1);
- reg1.addEndpoint(ep1);
- assertExists(reg1, "ep1uri");
- assertExists(reg2, "ep1uri");
- assertExists(reg3, "ep1uri");
-
- RuntimeEndpoint ep2 = createEndpoint("ep2uri");
- ep2.bind(extensionPoints, reg2);
- reg2.addEndpoint(ep2);
- assertExists(reg2, "ep2uri");
- assertExists(reg1, "ep2uri");
- assertExists(reg3, "ep2uri");
-
- reg1.stop();
- Thread.sleep(6000);
- Assert.assertNull(reg2.getEndpoint("ep1uri"));
- Assert.assertNull(reg3.getEndpoint("ep1uri"));
- assertExists(reg2, "ep2uri");
- assertExists(reg3, "ep2uri");
-
- reg1.start();
- ep1.bind(extensionPoints, reg1);
- reg1.addEndpoint(ep1);
- assertExists(reg1, "ep1uri");
- assertExists(reg2, "ep1uri");
- assertExists(reg3, "ep1uri");
-
- reg1.stop();
- reg2.stop();
- reg3.stop();
- System.out.println(); // closed
- }
-
- private Endpoint assertExists(ReplicatedEndpointRegistry reg, String uri) throws InterruptedException {
- Endpoint ep = null;
- int count = 0;
- while (ep == null && count < 15) {
- ep = reg.getEndpoint(uri);
- Thread.sleep(1000);
- count++;
- System.out.println(reg + ": tries=" + count);
- }
- Assert.assertNotNull(ep);
- Assert.assertEquals(uri, ep.getURI());
- return ep;
- }
-
- private RuntimeEndpoint createEndpoint(String uri) {
- RuntimeEndpoint ep = (RuntimeEndpoint) assemblyFactory.createEndpoint();
- Component comp = assemblyFactory.createComponent();
- ep.setComponent(comp);
- ep.setService(assemblyFactory.createComponentService());
- Binding b = scaBindingFactory.createSCABinding();
- ep.setBinding(b);
- ep.setURI(uri);
- return ep;
- }
-
- private void print(String prefix, Endpoint ep) {
- System.out.println(prefix + ": "+ep);
- }
-
- public void endpointAdded(Endpoint endpoint) {
- print("Added", endpoint);
- }
-
- public void endpointRemoved(Endpoint endpoint) {
- print("Removed", endpoint);
- }
-
- public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) {
- print("Updated", newEndpoint);
- }
-
-}
diff --git a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java b/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java
deleted file mode 100644
index 99c5472c82..0000000000
--- a/sca-java-2.x/branches/2.0-Beta2/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistryTestCase.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.endpoint.tribes;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.tuscany.sca.assembly.AssemblyFactory;
-import org.apache.tuscany.sca.assembly.Endpoint;
-import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
-import org.apache.tuscany.sca.core.FactoryExtensionPoint;
-import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-@Ignore("TUSCANY-3718")
-public class ReplicatedEndpointRegistryTestCase {
-
- @Test
- // @Ignore("Ignore this test case for now as it might be sensitive to the multicast settings for a multi-homed machine")
- public void testReplicate() throws InterruptedException {
- DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry();
- FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
- AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class);
-
- Map<String, String> attrs = new HashMap<String, String>();
- attrs.put("bind", "127.0.0.1");
- ReplicatedEndpointRegistry ep1 = new ReplicatedEndpointRegistry(extensionPoints, attrs, "foo", "bar");
- System.out.println("ep1 is: " + ep1);
- ep1.start();
-
- Endpoint e1 = assemblyFactory.createEndpoint();
- e1.setURI("e1uri");
- ((RuntimeEndpoint) e1).bind(extensionPoints, ep1);
- ep1.addEndpoint(e1);
-
- Endpoint e1p = ep1.getEndpoint("e1uri");
- System.out.println("EP1 in Registry 1: " + e1p);
- Assert.assertNotNull(e1p);
-
- ReplicatedEndpointRegistry ep2 = new ReplicatedEndpointRegistry(extensionPoints, attrs, "foo", "bar");
- System.out.println("ep2 is: " + ep2);
- ep2.start();
- Thread.sleep(5000);
-
- Endpoint e1p2 = ep2.getEndpoint("e1uri");
- System.out.println("EP1 in Registry 2: " + e1p2);
- Assert.assertNotNull(e1p2);
-
- ReplicatedEndpointRegistry ep3 = new ReplicatedEndpointRegistry(extensionPoints, attrs, "foo", "bar");
- System.out.println("ep3 is: " + ep3);
- ep3.start();
- Thread.sleep(5000);
-
- Endpoint e1p3 = ep3.getEndpoint("e1uri");
- System.out.println("EP1 in Registry 3: " + e1p3);
- Assert.assertNotNull(e1p3);
-
- ep1.stop();
- ep2.stop();
- ep3.stop();
- }
-
- public static void main(String[] args) throws Exception {
- new ReplicatedEndpointRegistryTestCase().testReplicate();
- }
-}