diff options
author | rfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68> | 2010-01-19 05:37:38 +0000 |
---|---|---|
committer | rfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68> | 2010-01-19 05:37:38 +0000 |
commit | 9da25a6532fcbb9ae66294f1ff9ea903653c83e5 (patch) | |
tree | a3b91c4a580b096d5bdaa38c0be963f4968b7d28 /sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java | |
parent | 347f83ffa0b5cf5e69289dbf290c09b01a227c5c (diff) |
Expose system definitions from the deployer
Add the removal of entries when the member leaves the group
Add listeners to the hazelcastInstance
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@900661 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java')
3 files changed, 89 insertions, 23 deletions
diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java index 47b3450594..fe683af025 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java @@ -708,7 +708,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb // [rfeng] Change the behavior to replicate to all nodes if (entry.isPrimary() && self.equals(entry.getPrimary())) { try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); entry.setPrimary(self); } catch (ChannelException x) { @@ -768,7 +768,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb if (log.isDebugEnabled()) log.debug("[1] Primary choosing a new backup"); try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); entry.setPrimary(channel.getLocalMember(false)); } catch (ChannelException x) { @@ -798,7 +798,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb entry.setPrimary(channel.getLocalMember(false)); entry.setBackup(false); entry.setProxy(false); - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); if (mapOwner != null) mapOwner.objectMadePrimay(entry.getKey(), entry.getValue()); @@ -833,7 +833,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb return members[node]; } - protected abstract Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException; + protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException; public void heartbeat() { try { @@ -916,7 +916,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb } if (entry.isBackup()) { //select a new backup node - backup = publishEntryInfo(key, entry.getValue(), entry.getBackupNodes()); + backup = publishEntryInfo(key, entry.getValue()); } else if (entry.isProxy()) { //invalidate the previous primary msg = @@ -997,7 +997,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb old = remove(key); try { if (notify) { - Member[] backup = publishEntryInfo(key, value, entry.getBackupNodes()); + Member[] backup = publishEntryInfo(key, value); entry.setBackupNodes(backup); } } catch (ChannelException x) { diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java index 0c2144ea49..42d2eda6a3 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java @@ -38,11 +38,13 @@ import java.util.logging.Logger; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.GroupChannel; import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor; import org.apache.catalina.tribes.membership.McastService; import org.apache.catalina.tribes.membership.StaticMember; +import org.apache.catalina.tribes.transport.ReceiverBase; import org.apache.tuscany.sca.assembly.Endpoint; import org.apache.tuscany.sca.assembly.EndpointReference; import org.apache.tuscany.sca.core.ExtensionPointRegistry; @@ -67,6 +69,9 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi private String address = MULTICAST_ADDRESS; private String bind = null; private int timeout = 50; + private int receiverPort = 4000; + private int receiverAutoBind = 100; + private List<URI> staticRoutes; private final static String DEFAULT_DOMAIN_URI = "http://tuscany.apache.org/sca/1.1/domains/default"; private String domainURI = DEFAULT_DOMAIN_URI; @@ -75,7 +80,6 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi private ExtensionPointRegistry registry; private ReplicatedMap map; - private static List<URI> staticRoutes; private String id; private boolean noMultiCast; @@ -175,6 +179,14 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi if (mcast != null) { noMultiCast = Boolean.valueOf(mcast); } + String recvPort = attributes.get("receiverPort"); + if (recvPort != null) { + receiverPort = Integer.parseInt(recvPort); + } + String recvAutoBind = attributes.get("receiverAutoBind"); + if (recvAutoBind != null) { + receiverAutoBind = Integer.parseInt(recvAutoBind); + } } public void start() { @@ -190,9 +202,24 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi if (noMultiCast) { map.getChannel().addInterceptor(new DisableMcastInterceptor()); } - - // http://www.mail-archive.com/users@tomcat.apache.org/msg24873.html - int port = channel.getChannelReceiver().getPort(); + + // Configure the receiver ports + ChannelReceiver receiver = channel.getChannelReceiver(); + if (receiver instanceof ReceiverBase) { + ((ReceiverBase)receiver).setAutoBind(receiverAutoBind); + ((ReceiverBase)receiver).setPort(receiverPort); + } + + /* + Object sender = channel.getChannelSender(); + if (sender instanceof ReplicationTransmitter) { + sender = ((ReplicationTransmitter)sender).getTransport(); + } + if (sender instanceof AbstractSender) { + ((AbstractSender)sender).setKeepAliveCount(0); + ((AbstractSender)sender).setMaxRetryAttempts(5); + } + */ if (staticRoutes != null) { StaticMembershipInterceptor smi = new StaticMembershipInterceptor(); @@ -200,12 +227,12 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi Member member; try { // The port has to match the receiver port - member = new StaticMember(staticRoute.getHost(), port, 5000); + member = new StaticMember(staticRoute.getHost(), staticRoute.getPort(), 5000); } catch (IOException e) { throw new RuntimeException(e); } smi.addStaticMember(member); - logger.info("Added static route: " + staticRoute.getHost() + ":" + port); + logger.info("Added static route: " + staticRoute.getHost() + ":" + staticRoute.getPort()); } smi.setLocalMember(map.getChannel().getLocalMember(false)); map.getChannel().addInterceptor(smi); @@ -410,6 +437,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi logger.info(id + " Remote endpoint removed: " + entry.getValue()); } Endpoint oldEp = (Endpoint)entry.getValue(); + ((RuntimeEndpoint) oldEp).bind(registry, this); for (EndpointListener listener : listeners) { listener.endpointRemoved(oldEp); } diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java index 762407604d..669ad82192 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java @@ -17,6 +17,8 @@ package org.apache.tuscany.sca.endpoint.tribes; import java.io.Serializable; +import java.util.Iterator; +import java.util.Map; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; @@ -97,29 +99,65 @@ public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, * @return Member - the backup node * @throws ChannelException */ - protected Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException { + protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { if (!(key instanceof Serializable && value instanceof Serializable)) return new Member[0]; //select a backup node - Member[] backup = getMapMembers(); + Member[] members = getMapMembers(); - if (backup == null || backup.length == 0) - return null; - - // Set the receivers to these members that are not in the backup nodes yet - Member[] members = backup; - if (backupNodes != null) { - members = getMapMembersExcl(backupNodes); + if (members == null || members.length == 0) { + return new Member[0]; } //publish the data out to all nodes MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value, - null, channel.getLocalMember(false), backup); + null, channel.getLocalMember(false), members); getChannel().send(members, msg, getChannelSendOptions()); - return backup; + return members; + } + + /** + * Override the base method to look up existing entries only + */ + public Object get(Object key) { + MapEntry entry = super.getInternal(key); + if (log.isTraceEnabled()) + log.trace("Requesting id:" + key + " entry:" + entry); + if (entry == null) { + return null; + } + return entry.getValue(); } + /** + * Override the base method to remove all entries owned by the member that disappeared + */ + public void memberDisappeared(Member member) { + boolean removed = false; + synchronized (mapMembers) { + removed = (mapMembers.remove(member) != null); + if (!removed) { + if (log.isDebugEnabled()) + log.debug("Member[" + member + "] disappeared, but was not present in the map."); + return; //the member was not part of our map. + } + } + + Iterator<Map.Entry<Object, Object>> i = super.entrySetFull().iterator(); + while (i.hasNext()) { + Map.Entry<Object, Object> e = i.next(); + MapEntry entry = (MapEntry)super.getInternal(e.getKey()); + if (entry == null) { + continue; + } + if (member.equals(entry.getPrimary())) { + if (log.isDebugEnabled()) + log.debug("[2] Primary disappeared"); + i.remove(); + } //end if + } //while + } } |