summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java
diff options
context:
space:
mode:
authorrfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68>2010-01-19 05:37:38 +0000
committerrfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68>2010-01-19 05:37:38 +0000
commit9da25a6532fcbb9ae66294f1ff9ea903653c83e5 (patch)
treea3b91c4a580b096d5bdaa38c0be963f4968b7d28 /sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java
parent347f83ffa0b5cf5e69289dbf290c09b01a227c5c (diff)
Expose system definitions from the deployer
Add the removal of entries when the member leaves the group Add listeners to the hazelcastInstance git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@900661 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java')
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java12
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java40
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java60
3 files changed, 89 insertions, 23 deletions
diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
index 47b3450594..fe683af025 100644
--- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
+++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
@@ -708,7 +708,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
// [rfeng] Change the behavior to replicate to all nodes
if (entry.isPrimary() && self.equals(entry.getPrimary())) {
try {
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes());
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
entry.setBackupNodes(backup);
entry.setPrimary(self);
} catch (ChannelException x) {
@@ -768,7 +768,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
if (log.isDebugEnabled())
log.debug("[1] Primary choosing a new backup");
try {
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes());
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
entry.setBackupNodes(backup);
entry.setPrimary(channel.getLocalMember(false));
} catch (ChannelException x) {
@@ -798,7 +798,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
entry.setPrimary(channel.getLocalMember(false));
entry.setBackup(false);
entry.setProxy(false);
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes());
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
entry.setBackupNodes(backup);
if (mapOwner != null)
mapOwner.objectMadePrimay(entry.getKey(), entry.getValue());
@@ -833,7 +833,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
return members[node];
}
- protected abstract Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException;
+ protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
public void heartbeat() {
try {
@@ -916,7 +916,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
}
if (entry.isBackup()) {
//select a new backup node
- backup = publishEntryInfo(key, entry.getValue(), entry.getBackupNodes());
+ backup = publishEntryInfo(key, entry.getValue());
} else if (entry.isProxy()) {
//invalidate the previous primary
msg =
@@ -997,7 +997,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
old = remove(key);
try {
if (notify) {
- Member[] backup = publishEntryInfo(key, value, entry.getBackupNodes());
+ Member[] backup = publishEntryInfo(key, value);
entry.setBackupNodes(backup);
}
} catch (ChannelException x) {
diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
index 0c2144ea49..42d2eda6a3 100644
--- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
+++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
@@ -38,11 +38,13 @@ import java.util.logging.Logger;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelReceiver;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
import org.apache.catalina.tribes.membership.McastService;
import org.apache.catalina.tribes.membership.StaticMember;
+import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
@@ -67,6 +69,9 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
private String address = MULTICAST_ADDRESS;
private String bind = null;
private int timeout = 50;
+ private int receiverPort = 4000;
+ private int receiverAutoBind = 100;
+ private List<URI> staticRoutes;
private final static String DEFAULT_DOMAIN_URI = "http://tuscany.apache.org/sca/1.1/domains/default";
private String domainURI = DEFAULT_DOMAIN_URI;
@@ -75,7 +80,6 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
private ExtensionPointRegistry registry;
private ReplicatedMap map;
- private static List<URI> staticRoutes;
private String id;
private boolean noMultiCast;
@@ -175,6 +179,14 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
if (mcast != null) {
noMultiCast = Boolean.valueOf(mcast);
}
+ String recvPort = attributes.get("receiverPort");
+ if (recvPort != null) {
+ receiverPort = Integer.parseInt(recvPort);
+ }
+ String recvAutoBind = attributes.get("receiverAutoBind");
+ if (recvAutoBind != null) {
+ receiverAutoBind = Integer.parseInt(recvAutoBind);
+ }
}
public void start() {
@@ -190,9 +202,24 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
if (noMultiCast) {
map.getChannel().addInterceptor(new DisableMcastInterceptor());
}
-
- // http://www.mail-archive.com/users@tomcat.apache.org/msg24873.html
- int port = channel.getChannelReceiver().getPort();
+
+ // Configure the receiver ports
+ ChannelReceiver receiver = channel.getChannelReceiver();
+ if (receiver instanceof ReceiverBase) {
+ ((ReceiverBase)receiver).setAutoBind(receiverAutoBind);
+ ((ReceiverBase)receiver).setPort(receiverPort);
+ }
+
+ /*
+ Object sender = channel.getChannelSender();
+ if (sender instanceof ReplicationTransmitter) {
+ sender = ((ReplicationTransmitter)sender).getTransport();
+ }
+ if (sender instanceof AbstractSender) {
+ ((AbstractSender)sender).setKeepAliveCount(0);
+ ((AbstractSender)sender).setMaxRetryAttempts(5);
+ }
+ */
if (staticRoutes != null) {
StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
@@ -200,12 +227,12 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
Member member;
try {
// The port has to match the receiver port
- member = new StaticMember(staticRoute.getHost(), port, 5000);
+ member = new StaticMember(staticRoute.getHost(), staticRoute.getPort(), 5000);
} catch (IOException e) {
throw new RuntimeException(e);
}
smi.addStaticMember(member);
- logger.info("Added static route: " + staticRoute.getHost() + ":" + port);
+ logger.info("Added static route: " + staticRoute.getHost() + ":" + staticRoute.getPort());
}
smi.setLocalMember(map.getChannel().getLocalMember(false));
map.getChannel().addInterceptor(smi);
@@ -410,6 +437,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
logger.info(id + " Remote endpoint removed: " + entry.getValue());
}
Endpoint oldEp = (Endpoint)entry.getValue();
+ ((RuntimeEndpoint) oldEp).bind(registry, this);
for (EndpointListener listener : listeners) {
listener.endpointRemoved(oldEp);
}
diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
index 762407604d..669ad82192 100644
--- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
+++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
@@ -17,6 +17,8 @@
package org.apache.tuscany.sca.endpoint.tribes;
import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
@@ -97,29 +99,65 @@ public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback,
* @return Member - the backup node
* @throws ChannelException
*/
- protected Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException {
+ protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
if (!(key instanceof Serializable && value instanceof Serializable))
return new Member[0];
//select a backup node
- Member[] backup = getMapMembers();
+ Member[] members = getMapMembers();
- if (backup == null || backup.length == 0)
- return null;
-
- // Set the receivers to these members that are not in the backup nodes yet
- Member[] members = backup;
- if (backupNodes != null) {
- members = getMapMembersExcl(backupNodes);
+ if (members == null || members.length == 0) {
+ return new Member[0];
}
//publish the data out to all nodes
MapMessage msg =
new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value,
- null, channel.getLocalMember(false), backup);
+ null, channel.getLocalMember(false), members);
getChannel().send(members, msg, getChannelSendOptions());
- return backup;
+ return members;
+ }
+
+ /**
+ * Override the base method to look up existing entries only
+ */
+ public Object get(Object key) {
+ MapEntry entry = super.getInternal(key);
+ if (log.isTraceEnabled())
+ log.trace("Requesting id:" + key + " entry:" + entry);
+ if (entry == null) {
+ return null;
+ }
+ return entry.getValue();
}
+ /**
+ * Override the base method to remove all entries owned by the member that disappeared
+ */
+ public void memberDisappeared(Member member) {
+ boolean removed = false;
+ synchronized (mapMembers) {
+ removed = (mapMembers.remove(member) != null);
+ if (!removed) {
+ if (log.isDebugEnabled())
+ log.debug("Member[" + member + "] disappeared, but was not present in the map.");
+ return; //the member was not part of our map.
+ }
+ }
+
+ Iterator<Map.Entry<Object, Object>> i = super.entrySetFull().iterator();
+ while (i.hasNext()) {
+ Map.Entry<Object, Object> e = i.next();
+ MapEntry entry = (MapEntry)super.getInternal(e.getKey());
+ if (entry == null) {
+ continue;
+ }
+ if (member.equals(entry.getPrimary())) {
+ if (log.isDebugEnabled())
+ log.debug("[2] Primary disappeared");
+ i.remove();
+ } //end if
+ } //while
+ }
}