Allow the replicated map to be listened on the add/remove/update of entries

git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@788201 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
rfeng 2009-06-24 22:34:12 +00:00
parent 145e23f46a
commit 9860bae756
6 changed files with 1852 additions and 13 deletions

View file

@ -14,8 +14,10 @@ Bundle-DocURL: http://www.apache.org/
Bundle-RequiredExecutionEnvironment: J2SE-1.5,JavaSE-1.6
Import-Package: org.apache.catalina.tribes,
org.apache.catalina.tribes.group,
org.apache.catalina.tribes.io,
org.apache.catalina.tribes.membership,
org.apache.catalina.tribes.tipis,
org.apache.catalina.tribes.util,
org.apache.juli.logging;resolution:=optional,
org.apache.tuscany.sca.assembly;version="2.0.0",
org.apache.tuscany.sca.core;version="2.0.0",

View file

@ -32,6 +32,7 @@
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>tribes</artifactId>
<!-- DO NOT upgrade to 6.0.20: https://issues.apache.org/bugzilla/show_bug.cgi?id=47419 -->
<version>6.0.18</version>
<scope>compile</scope>
</dependency>

View file

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tuscany.sca.endpoint.tribes;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* The Map that can fire events on put/remove of entries
*/
public abstract class MapStore extends ConcurrentHashMap<Object, Object> {
private static final long serialVersionUID = -2127235547082144368L;
private List<MapListener> listeners = new CopyOnWriteArrayList<MapListener>();
protected MapStore(int initialCapacity, float loadFactor, int concurrencyLevel) {
super(initialCapacity, loadFactor, concurrencyLevel);
}
@Override
public Object put(Object key, Object value) {
Object old = super.put(key, value);
if (old != null) {
for (MapListener listener : listeners) {
listener.entryUpdated(key, old, value);
}
} else {
for (MapListener listener : listeners) {
listener.entryAdded(key, value);
}
}
return old;
}
@Override
public Object remove(Object key) {
Object old = super.remove(key);
if (old != null) {
for (MapListener listener : listeners) {
listener.entryRemoved(key, old);
}
}
return old;
}
public void addListener(MapListener listener) {
listeners.add(listener);
}
public List<MapListener> getListeners() {
return listeners;
}
public boolean removeListener(MapListener listener) {
return listeners.remove(listener);
}
public static interface MapListener {
void entryAdded(Object key, Object value);
void entryUpdated(Object key, Object oldValue, Object newValue);
void entryRemoved(Object key, Object value);
}
}

View file

@ -19,9 +19,11 @@
package org.apache.tuscany.sca.endpoint.tribes;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Logger;
@ -29,19 +31,19 @@ import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.membership.McastService;
import org.apache.catalina.tribes.tipis.ReplicatedMap;
import org.apache.catalina.tribes.tipis.AbstractReplicatedMap.MapEntry;
import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.LifeCycleListener;
import org.apache.tuscany.sca.endpoint.tribes.AbstractReplicatedMap.MapEntry;
import org.apache.tuscany.sca.endpoint.tribes.MapStore.MapListener;
import org.apache.tuscany.sca.runtime.EndpointListener;
import org.apache.tuscany.sca.runtime.EndpointRegistry;
/**
* A replicated EndpointRegistry based on Apache Tomcat Tribes
*/
public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleListener {
public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleListener, MapListener {
private final static Logger logger = Logger.getLogger(ReplicatedEndpointRegistry.class.getName());
private static final String MULTICAST_ADDRESS = "228.0.0.100";
private static final int MULTICAST_PORT = 50000;
@ -92,7 +94,6 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
mcastService.setBind(bindAddress);
}
// mcastService.setBind("192.168.1.100");
return channel;
}
@ -126,6 +127,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
map =
new ReplicatedMap(null, createChannel(address, port, bind), timeout, this.domainURI,
new ClassLoader[] {ReplicatedEndpointRegistry.class.getClassLoader()});
map.addListener(this);
try {
map.getChannel().start(Channel.DEFAULT);
} catch (ChannelException e) {
@ -135,6 +137,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
public void stop() {
if (map != null) {
map.removeListener(this);
Channel channel = map.getChannel();
map.breakdown();
try {
@ -151,12 +154,12 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
for (EndpointListener listener : listeners) {
listener.endpointAdded(endpoint);
}
logger.info("EndpointRegistry: Add endpoint - " + endpoint);
logger.info("Add endpoint - " + endpoint);
}
public void addEndpointReference(EndpointReference endpointReference) {
endpointreferences.add(endpointReference);
logger.info("EndpointRegistry: Add endpoint reference - " + endpointReference);
logger.info("Add endpoint reference - " + endpointReference);
}
public void addListener(EndpointListener listener) {
@ -210,7 +213,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
public List<Endpoint> findEndpoint(EndpointReference endpointReference) {
List<Endpoint> foundEndpoints = new ArrayList<Endpoint>();
logger.info("EndpointRegistry: Find endpoint for reference - " + endpointReference);
logger.info("Find endpoint for reference - " + endpointReference);
if (endpointReference.getReference() != null) {
Endpoint targetEndpoint = endpointReference.getTargetEndpoint();
@ -219,14 +222,14 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
// TODO: implement more complete matching
if (matches(targetEndpoint.getURI(), endpoint.getURI())) {
MapEntry entry = map.getInternal(endpoint.getURI());
if (!entry.getPrimary().equals(map.getChannel().getLocalMember(false))) {
if (!isLocal(entry)) {
endpoint.setRemote(true);
}
// if (!entry.isPrimary()) {
endpoint.setExtensionPointRegistry(registry);
endpoint.setExtensionPointRegistry(registry);
// }
foundEndpoints.add(endpoint);
logger.info("EndpointRegistry: Found endpoint with matching service - " + endpoint);
logger.info("Found endpoint with matching service - " + endpoint);
}
// else the service name doesn't match
}
@ -234,6 +237,10 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
return foundEndpoints;
}
private boolean isLocal(MapEntry entry) {
return entry.getPrimary().equals(map.getChannel().getLocalMember(false));
}
public List<EndpointReference> findEndpointReference(Endpoint endpoint) {
return endpointreferences;
}
@ -247,7 +254,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
}
public List<Endpoint> getEndpoints() {
return new ArrayList<Endpoint>(map.values());
return new ArrayList(map.values());
}
public List<EndpointListener> getListeners() {
@ -259,12 +266,12 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
for (EndpointListener listener : listeners) {
listener.endpointRemoved(endpoint);
}
logger.info("EndpointRegistry: Remove endpoint - " + endpoint);
logger.info("Remove endpoint - " + endpoint);
}
public void removeEndpointReference(EndpointReference endpointReference) {
endpointreferences.remove(endpointReference);
logger.info("EndpointRegistry: Remove endpoint reference - " + endpointReference);
logger.info("Remove endpoint reference - " + endpointReference);
}
public void removeListener(EndpointListener listener) {
@ -282,4 +289,70 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
}
}
public void entryAdded(Object key, Object value) {
MapEntry entry = (MapEntry)value;
if (!isLocal(entry)) {
logger.info("Remote endpoint added: " + entry.getValue());
}
Endpoint newEp = (Endpoint)entry.getValue();
for (EndpointListener listener : listeners) {
listener.endpointAdded(newEp);
}
}
public void entryRemoved(Object key, Object value) {
MapEntry entry = (MapEntry)value;
if (!isLocal(entry)) {
logger.info("Remote endpoint removed: " + entry.getValue());
}
Endpoint oldEp = (Endpoint)entry.getValue();
for (EndpointListener listener : listeners) {
listener.endpointRemoved(oldEp);
}
}
public void entryUpdated(Object key, Object oldValue, Object newValue) {
MapEntry oldEntry = (MapEntry)oldValue;
MapEntry newEntry = (MapEntry)newValue;
if (!isLocal(newEntry)) {
logger.info("Remote endpoint updated: " + newEntry.getValue());
}
Endpoint oldEp = (Endpoint)oldEntry.getValue();
Endpoint newEp = (Endpoint)newEntry.getValue();
for (EndpointListener listener : listeners) {
listener.endpointUpdated(oldEp, newEp);
}
}
public static void main(String[] args) throws Exception {
//create a channel
GroupChannel channel = new GroupChannel();
McastService mcastService = (McastService)channel.getMembershipService();
mcastService.setPort(MULTICAST_PORT);
mcastService.setAddress(MULTICAST_ADDRESS);
InetAddress localhost = InetAddress.getLocalHost();
// REVIEW: In my case, there are multiple IP addresses
// One for the WIFI and the other one for VPN. For some reason the VPN one doesn't support
// Multicast
// You can use "route add 228.0.0.0 mask 252.0.0.0 192.168.1.100"
// mcastService.setBind("192.168.1.100");
channel.start(Channel.DEFAULT);
ReplicatedMap map = new ReplicatedMap(null, channel, 50, "01", null);
map.put(UUID.randomUUID().toString(), localhost.getHostAddress());
for (int i = 0; i < 4; i++) {
Thread.sleep(3000);
System.out.println(localhost + ": " + map.keySet());
}
for (Object e : map.entrySetFull()) {
Map.Entry en = (Map.Entry)e;
AbstractReplicatedMap.MapEntry entry = (AbstractReplicatedMap.MapEntry)en.getValue();
System.out.println(entry);
}
map.breakdown();
channel.stop(Channel.DEFAULT);
}
}

View file

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tuscany.sca.endpoint.tribes;
import java.io.Serializable;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.group.RpcCallback;
/**
* This file is copied from:
* https://svn.apache.org/repos/asf/tomcat/tc6.0.x/tags/TOMCAT_6_0_20/java/org/apache/catalina/tribes/tipis/ReplicatedMap.java
*
*/
public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, ChannelListener, MembershipListener {
private static final long serialVersionUID = -6318779627600581121L;
protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(ReplicatedMap.class);
//------------------------------------------------------------------------------
// CONSTRUCTORS / DESTRUCTORS
//------------------------------------------------------------------------------
/**
* Creates a new map
* @param channel The channel to use for communication
* @param timeout long - timeout for RPC messags
* @param mapContextName String - unique name for this map, to allow multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
* @param loadFactor float - load factor, see HashMap
*/
public ReplicatedMap(MapOwner owner,
Channel channel,
long timeout,
String mapContextName,
int initialCapacity,
float loadFactor,
ClassLoader[] cls) {
super(owner, channel, timeout, mapContextName, initialCapacity, loadFactor, Channel.SEND_OPTIONS_DEFAULT, cls);
}
/**
* Creates a new map
* @param channel The channel to use for communication
* @param timeout long - timeout for RPC messags
* @param mapContextName String - unique name for this map, to allow multiple maps per channel
* @param initialCapacity int - the size of this map, see HashMap
*/
public ReplicatedMap(MapOwner owner,
Channel channel,
long timeout,
String mapContextName,
int initialCapacity,
ClassLoader[] cls) {
super(owner, channel, timeout, mapContextName, initialCapacity, AbstractReplicatedMap.DEFAULT_LOAD_FACTOR,
Channel.SEND_OPTIONS_DEFAULT, cls);
}
/**
* Creates a new map
* @param channel The channel to use for communication
* @param timeout long - timeout for RPC messags
* @param mapContextName String - unique name for this map, to allow multiple maps per channel
*/
public ReplicatedMap(MapOwner owner, Channel channel, long timeout, String mapContextName, ClassLoader[] cls) {
super(owner, channel, timeout, mapContextName, AbstractReplicatedMap.DEFAULT_INITIAL_CAPACITY,
AbstractReplicatedMap.DEFAULT_LOAD_FACTOR, Channel.SEND_OPTIONS_DEFAULT, cls);
}
//------------------------------------------------------------------------------
// METHODS TO OVERRIDE
//------------------------------------------------------------------------------
protected int getStateMessageType() {
return AbstractReplicatedMap.MapMessage.MSG_STATE_COPY;
}
/**
* publish info about a map pair (key/value) to other nodes in the cluster
* @param key Object
* @param value Object
* @return Member - the backup node
* @throws ChannelException
*/
protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
if (!(key instanceof Serializable && value instanceof Serializable))
return new Member[0];
//select a backup node
Member[] backup = getMapMembers();
if (backup == null || backup.length == 0)
return null;
//publish the data out to all nodes
MapMessage msg =
new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value,
null, channel.getLocalMember(false), backup);
getChannel().send(getMapMembers(), msg, getChannelSendOptions());
return backup;
}
}