diff options
author | rfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68> | 2010-01-27 04:33:47 +0000 |
---|---|---|
committer | rfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68> | 2010-01-27 04:33:47 +0000 |
commit | 0c1527f0415fb7d6c06b0ad2a1d4ce73cd728852 (patch) | |
tree | 7d0658a2e64c4fdec2957090fc70dfd0efed09d7 /sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl | |
parent | c2dfd0917dbb0793b7934ccc298e03187e6e3418 (diff) |
Fix multiple concurrency related issues
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@903542 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl')
3 files changed, 118 insertions, 112 deletions
diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java index 886e79197d..dbdbbaead8 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java @@ -19,15 +19,12 @@ package org.apache.tuscany.sca.osgi.service.discovery.impl; -import static org.osgi.service.remoteserviceadmin.EndpointListener.ENDPOINT_LISTENER_SCOPE; - import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Dictionary; -import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; @@ -35,9 +32,11 @@ import java.util.logging.Logger; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; import org.apache.tuscany.sca.node.NodeFactory; import org.apache.tuscany.sca.node.impl.NodeFactoryImpl; import org.apache.tuscany.sca.osgi.remoteserviceadmin.impl.OSGiHelper; +import org.apache.tuscany.sca.work.WorkScheduler; import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; import org.osgi.framework.Constants; @@ -60,13 +59,13 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi protected BundleContext context; protected ExtensionPointRegistry registry; + private WorkScheduler workScheduler; - private Map<String, List<EndpointListener>> filtersToListeners = new HashMap<String, List<EndpointListener>>(); - // this is effectively a set which allows for multiple service descriptions with the - // same interface name but different properties and takes care of itself with respect to concurrency - protected Map<EndpointDescription, Bundle> endpointDescriptions = new ConcurrentHashMap<EndpointDescription, Bundle>(); private Map<EndpointListener, Collection<String>> listenersToFilters = - new HashMap<EndpointListener, Collection<String>>(); + new ConcurrentHashMap<EndpointListener, Collection<String>>(); + + protected Map<EndpointDescription, Bundle> endpointDescriptions = + new ConcurrentHashMap<EndpointDescription, Bundle>(); private ServiceTracker trackerTracker; public AbstractDiscoveryService(BundleContext context) { @@ -75,6 +74,10 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi } public void start() { + getExtensionPointRegistry(); + UtilityExtensionPoint utilityExtensionPoint = registry.getExtensionPoint(UtilityExtensionPoint.class); + this.workScheduler = utilityExtensionPoint.getUtility(WorkScheduler.class); + // track the registration of EndpointListener trackerTracker = new ServiceTracker(this.context, EndpointListener.class.getName(), null) { public Object addingService(ServiceReference reference) { @@ -122,66 +125,52 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi return props; } - private synchronized void cacheTracker(ServiceReference reference, Object service) { + private void cacheTracker(ServiceReference reference, Object service) { if (service instanceof EndpointListener) { EndpointListener listener = (EndpointListener)service; - Collection<String> filters = - addTracker(reference, listener, ENDPOINT_LISTENER_SCOPE, filtersToListeners, listenersToFilters); - - triggerCallbacks(null, filters, listener, true); + Collection<String> filters = null; + Collection<EndpointDescription> endpoints = null; + synchronized (this) { + filters = addTracker(reference, listener, EndpointListener.ENDPOINT_LISTENER_SCOPE); + // Take a snapshot of the endpoints + triggerCallbacks(null, filters, listener); + } } } - private synchronized void clearTracker(Object service) { + private void clearTracker(Object service) { if (service instanceof EndpointListener) { - removeTracker((EndpointListener)service, filtersToListeners, listenersToFilters); + synchronized (this) { + removeTracker((EndpointListener)service); + } } } - private synchronized void updateTracker(ServiceReference reference, Object service) { + private void updateTracker(ServiceReference reference, Object service) { if (service instanceof EndpointListener) { EndpointListener listener = (EndpointListener)service; - if (logger.isLoggable(Level.FINE)) { - logger.fine("updating listener: " + listener); + Collection<String> oldFilters = null; + Collection<String> newFilters = null; + Collection<EndpointDescription> endpoints = null; + synchronized (this) { + if (logger.isLoggable(Level.FINE)) { + logger.fine("updating listener: " + listener); + } + oldFilters = removeTracker(listener); + newFilters = addTracker(reference, listener, EndpointListener.ENDPOINT_LISTENER_SCOPE); + triggerCallbacks(oldFilters, newFilters, listener); } - Collection<String> oldFilters = removeTracker(listener, filtersToListeners, listenersToFilters); - - Collection<String> newFilters = - addTracker(reference, listener, ENDPOINT_LISTENER_SCOPE, filtersToListeners, listenersToFilters); - - triggerCallbacks(oldFilters, newFilters, listener, true); } } private void triggerCallbacks(Collection<String> oldInterest, Collection<String> newInterest, - EndpointListener listener, - boolean isFilter) { + EndpointListener listener) { // compute delta between old & new interfaces/filters and // trigger callbacks for any entries in servicesInfo that // match any *additional* interface/filters - Collection<String> deltaInterest = new ArrayList<String>(); - if (newInterest != null && !newInterest.isEmpty()) { - if (oldInterest == null || oldInterest.isEmpty()) { - deltaInterest.addAll(newInterest); - } else { - Iterator<String> i = newInterest.iterator(); - while (i.hasNext()) { - String next = (String)i.next(); - if (!oldInterest.contains(next)) { - deltaInterest.add(next); - } - } - } - } + Collection<String> deltaInterest = getDelta(oldInterest, newInterest); - if (logger.isLoggable(Level.FINE)) { - if (endpointDescriptions.size() > 0) { - logger.fine("search for matches to trigger callbacks with delta: " + deltaInterest); - } else { - logger.fine("nothing to search for matches to trigger callbacks with delta: " + deltaInterest); - } - } Iterator<String> i = deltaInterest.iterator(); while (i.hasNext()) { String next = i.next(); @@ -191,21 +180,74 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi } } - private void triggerCallbacks(EndpointListener listener, String matchedFilter, EndpointDescription sd, int type) { + private Collection<String> getDelta(Collection<String> oldInterest, Collection<String> newInterest) { + if (newInterest == null) { + newInterest = Collections.emptySet(); + } + + Collection<String> deltaInterest = new ArrayList<String>(newInterest); + if (oldInterest == null) { + oldInterest = Collections.emptySet(); + } + deltaInterest.removeAll(oldInterest); + return deltaInterest; + } + + /** + * Notify the endpoint listener + * @param listener + * @param matchedFilter + * @param endpoint + * @param type + */ + private static void notify(EndpointListener listener, String matchedFilter, EndpointDescription endpoint, int type) { switch (type) { case ADDED: - listener.endpointAdded(sd, matchedFilter); + listener.endpointAdded(endpoint, matchedFilter); break; case REMOVED: - listener.endpointRemoved(sd, matchedFilter); + listener.endpointRemoved(endpoint, matchedFilter); break; case MODIFIED: - listener.endpointRemoved(sd, matchedFilter); - listener.endpointAdded(sd, matchedFilter); + listener.endpointRemoved(endpoint, matchedFilter); + listener.endpointAdded(endpoint, matchedFilter); break; } } + private static class Notifier implements Runnable { + private EndpointListener listener; + private String matchedFilter; + private EndpointDescription endpoint; + private int type; + + /** + * @param listener + * @param matchedFilter + * @param endpoint + * @param type + */ + public Notifier(EndpointListener listener, String matchedFilter, EndpointDescription endpoint, int type) { + super(); + this.listener = listener; + this.matchedFilter = matchedFilter; + this.endpoint = endpoint; + this.type = type; + } + + public void run() { + AbstractDiscoveryService.notify(listener, matchedFilter, endpoint, type); + } + } + + private void triggerCallbacks(EndpointListener listener, + String matchedFilter, + EndpointDescription endpoint, + int type) { + workScheduler.scheduleWork(new Notifier(listener, matchedFilter, endpoint, type)); + + } + private boolean filterMatches(String filterValue, EndpointDescription sd) { Filter filter = OSGiHelper.createFilter(context, filterValue); Hashtable<String, Object> props = new Hashtable<String, Object>(sd.getProperties()); @@ -215,63 +257,25 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi return filter != null ? filter.match(props) : false; } - static Collection<String> removeTracker(EndpointListener listener, - Map<String, List<EndpointListener>> forwardMap, - Map<EndpointListener, Collection<String>> reverseMap) { - Collection<String> collection = reverseMap.get(listener); - if (collection != null && !collection.isEmpty()) { - reverseMap.remove(listener); - Iterator<String> i = collection.iterator(); - while (i.hasNext()) { - String element = i.next(); - if (forwardMap.containsKey(element)) { - forwardMap.get(element).remove(listener); - } else { - // if the element wasn't on the forwardmap, its a new element and - // shouldn't be returned as part of the collection of old ones - i.remove(); - } - } - } - return collection; + private Collection<String> removeTracker(EndpointListener listener) { + return listenersToFilters.remove(listener); } - @SuppressWarnings("unchecked") - static Collection<String> addTracker(ServiceReference reference, - EndpointListener listener, - String property, - Map<String, List<EndpointListener>> forwardMap, - Map<EndpointListener, Collection<String>> reverseMap) { + private Collection<String> addTracker(ServiceReference reference, EndpointListener listener, String property) { Collection<String> collection = OSGiHelper.getStringCollection(reference, property); - if (logger.isLoggable(Level.FINE)) { - logger.fine("adding listener: " + listener - + " collection: " - + collection - + " registered against prop: " - + property); - } if (collection != null && !collection.isEmpty()) { - reverseMap.put(listener, new ArrayList<String>(collection)); - Iterator<String> i = collection.iterator(); - while (i.hasNext()) { - String element = i.next(); - if (forwardMap.containsKey(element)) { - forwardMap.get(element).add(listener); - } else { - List<EndpointListener> trackerList = new ArrayList<EndpointListener>(); - trackerList.add(listener); - forwardMap.put(element, trackerList); - } - } + listenersToFilters.put(listener, new ArrayList<String>(collection)); } return collection; } - protected synchronized void endpointChanged(EndpointDescription sd, int type) { - for (Map.Entry<EndpointListener, Collection<String>> entry : listenersToFilters.entrySet()) { - for (String filter : entry.getValue()) { - if (filterMatches(filter, sd)) { - triggerCallbacks(entry.getKey(), filter, sd, type); + protected void endpointChanged(EndpointDescription sd, int type) { + synchronized (this) { + for (Map.Entry<EndpointListener, Collection<String>> entry : listenersToFilters.entrySet()) { + for (String filter : entry.getValue()) { + if (filterMatches(filter, sd)) { + triggerCallbacks(entry.getKey(), filter, sd, type); + } } } } diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java index a02be672a2..c9c4cb6a69 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java @@ -49,7 +49,6 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements public void start() { super.start(); - getExtensionPointRegistry(); this.domainRegistryFactory = registry.getExtensionPoint(UtilityExtensionPoint.class).getUtility(DomainRegistryFactory.class); domainRegistryFactory.addListener(this); @@ -80,7 +79,7 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements } } - public synchronized void endpointAdded(Endpoint endpoint) { + public void endpointAdded(Endpoint endpoint) { Implementation impl = endpoint.getComponent().getImplementation(); if (!(impl instanceof OSGiImplementation)) { return; @@ -94,20 +93,24 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements bundleContext = bundle != null ? bundle.getBundleContext() : null; } - // Notify the endpoint listeners - EndpointDescription description = createEndpointDescription(bundleContext, endpoint); - // Set the owning bundle to runtime bundle to avoid NPE + // Notify the endpoint listeners + EndpointDescription description = createEndpointDescription(bundleContext, endpoint); + // Set the owning bundle to runtime bundle to avoid NPE + synchronized (this) { endpointDescriptions.put(description, context.getBundle()); endpointChanged(description, ADDED); + } } - public synchronized void endpointRemoved(Endpoint endpoint) { - EndpointDescription description = createEndpointDescription(context, endpoint); + public void endpointRemoved(Endpoint endpoint) { + EndpointDescription description = createEndpointDescription(context, endpoint); + synchronized (this) { endpointDescriptions.remove(description); endpointChanged(description, REMOVED); + } } - public synchronized void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) { + public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) { // FIXME: This is a quick and dirty way for the update endpointRemoved(oldEndpoint); endpointAdded(newEndpoint); @@ -124,7 +127,7 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements super.stop(); } } - + @Override protected Dictionary<String, Object> getProperties() { Dictionary<String, Object> props = super.getProperties(); diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/LocalDiscoveryService.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/LocalDiscoveryService.java index 56a830a1e6..a31d05acf0 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/LocalDiscoveryService.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/LocalDiscoveryService.java @@ -63,7 +63,6 @@ public class LocalDiscoveryService extends AbstractDiscoveryService implements B public void start() { super.start(); - getExtensionPointRegistry(); UtilityExtensionPoint utilities = this.registry.getExtensionPoint(UtilityExtensionPoint.class); this.deployer = utilities.getUtility(Deployer.class); |