summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java208
1 files changed, 106 insertions, 102 deletions
diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java
index 886e79197d..dbdbbaead8 100644
--- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java
+++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java
@@ -19,15 +19,12 @@
package org.apache.tuscany.sca.osgi.service.discovery.impl;
-import static org.osgi.service.remoteserviceadmin.EndpointListener.ENDPOINT_LISTENER_SCOPE;
-
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Dictionary;
-import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
@@ -35,9 +32,11 @@ import java.util.logging.Logger;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.LifeCycleListener;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.node.NodeFactory;
import org.apache.tuscany.sca.node.impl.NodeFactoryImpl;
import org.apache.tuscany.sca.osgi.remoteserviceadmin.impl.OSGiHelper;
+import org.apache.tuscany.sca.work.WorkScheduler;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
@@ -60,13 +59,13 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi
protected BundleContext context;
protected ExtensionPointRegistry registry;
+ private WorkScheduler workScheduler;
- private Map<String, List<EndpointListener>> filtersToListeners = new HashMap<String, List<EndpointListener>>();
- // this is effectively a set which allows for multiple service descriptions with the
- // same interface name but different properties and takes care of itself with respect to concurrency
- protected Map<EndpointDescription, Bundle> endpointDescriptions = new ConcurrentHashMap<EndpointDescription, Bundle>();
private Map<EndpointListener, Collection<String>> listenersToFilters =
- new HashMap<EndpointListener, Collection<String>>();
+ new ConcurrentHashMap<EndpointListener, Collection<String>>();
+
+ protected Map<EndpointDescription, Bundle> endpointDescriptions =
+ new ConcurrentHashMap<EndpointDescription, Bundle>();
private ServiceTracker trackerTracker;
public AbstractDiscoveryService(BundleContext context) {
@@ -75,6 +74,10 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi
}
public void start() {
+ getExtensionPointRegistry();
+ UtilityExtensionPoint utilityExtensionPoint = registry.getExtensionPoint(UtilityExtensionPoint.class);
+ this.workScheduler = utilityExtensionPoint.getUtility(WorkScheduler.class);
+
// track the registration of EndpointListener
trackerTracker = new ServiceTracker(this.context, EndpointListener.class.getName(), null) {
public Object addingService(ServiceReference reference) {
@@ -122,66 +125,52 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi
return props;
}
- private synchronized void cacheTracker(ServiceReference reference, Object service) {
+ private void cacheTracker(ServiceReference reference, Object service) {
if (service instanceof EndpointListener) {
EndpointListener listener = (EndpointListener)service;
- Collection<String> filters =
- addTracker(reference, listener, ENDPOINT_LISTENER_SCOPE, filtersToListeners, listenersToFilters);
-
- triggerCallbacks(null, filters, listener, true);
+ Collection<String> filters = null;
+ Collection<EndpointDescription> endpoints = null;
+ synchronized (this) {
+ filters = addTracker(reference, listener, EndpointListener.ENDPOINT_LISTENER_SCOPE);
+ // Take a snapshot of the endpoints
+ triggerCallbacks(null, filters, listener);
+ }
}
}
- private synchronized void clearTracker(Object service) {
+ private void clearTracker(Object service) {
if (service instanceof EndpointListener) {
- removeTracker((EndpointListener)service, filtersToListeners, listenersToFilters);
+ synchronized (this) {
+ removeTracker((EndpointListener)service);
+ }
}
}
- private synchronized void updateTracker(ServiceReference reference, Object service) {
+ private void updateTracker(ServiceReference reference, Object service) {
if (service instanceof EndpointListener) {
EndpointListener listener = (EndpointListener)service;
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("updating listener: " + listener);
+ Collection<String> oldFilters = null;
+ Collection<String> newFilters = null;
+ Collection<EndpointDescription> endpoints = null;
+ synchronized (this) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("updating listener: " + listener);
+ }
+ oldFilters = removeTracker(listener);
+ newFilters = addTracker(reference, listener, EndpointListener.ENDPOINT_LISTENER_SCOPE);
+ triggerCallbacks(oldFilters, newFilters, listener);
}
- Collection<String> oldFilters = removeTracker(listener, filtersToListeners, listenersToFilters);
-
- Collection<String> newFilters =
- addTracker(reference, listener, ENDPOINT_LISTENER_SCOPE, filtersToListeners, listenersToFilters);
-
- triggerCallbacks(oldFilters, newFilters, listener, true);
}
}
private void triggerCallbacks(Collection<String> oldInterest,
Collection<String> newInterest,
- EndpointListener listener,
- boolean isFilter) {
+ EndpointListener listener) {
// compute delta between old & new interfaces/filters and
// trigger callbacks for any entries in servicesInfo that
// match any *additional* interface/filters
- Collection<String> deltaInterest = new ArrayList<String>();
- if (newInterest != null && !newInterest.isEmpty()) {
- if (oldInterest == null || oldInterest.isEmpty()) {
- deltaInterest.addAll(newInterest);
- } else {
- Iterator<String> i = newInterest.iterator();
- while (i.hasNext()) {
- String next = (String)i.next();
- if (!oldInterest.contains(next)) {
- deltaInterest.add(next);
- }
- }
- }
- }
+ Collection<String> deltaInterest = getDelta(oldInterest, newInterest);
- if (logger.isLoggable(Level.FINE)) {
- if (endpointDescriptions.size() > 0) {
- logger.fine("search for matches to trigger callbacks with delta: " + deltaInterest);
- } else {
- logger.fine("nothing to search for matches to trigger callbacks with delta: " + deltaInterest);
- }
- }
Iterator<String> i = deltaInterest.iterator();
while (i.hasNext()) {
String next = i.next();
@@ -191,21 +180,74 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi
}
}
- private void triggerCallbacks(EndpointListener listener, String matchedFilter, EndpointDescription sd, int type) {
+ private Collection<String> getDelta(Collection<String> oldInterest, Collection<String> newInterest) {
+ if (newInterest == null) {
+ newInterest = Collections.emptySet();
+ }
+
+ Collection<String> deltaInterest = new ArrayList<String>(newInterest);
+ if (oldInterest == null) {
+ oldInterest = Collections.emptySet();
+ }
+ deltaInterest.removeAll(oldInterest);
+ return deltaInterest;
+ }
+
+ /**
+ * Notify the endpoint listener
+ * @param listener
+ * @param matchedFilter
+ * @param endpoint
+ * @param type
+ */
+ private static void notify(EndpointListener listener, String matchedFilter, EndpointDescription endpoint, int type) {
switch (type) {
case ADDED:
- listener.endpointAdded(sd, matchedFilter);
+ listener.endpointAdded(endpoint, matchedFilter);
break;
case REMOVED:
- listener.endpointRemoved(sd, matchedFilter);
+ listener.endpointRemoved(endpoint, matchedFilter);
break;
case MODIFIED:
- listener.endpointRemoved(sd, matchedFilter);
- listener.endpointAdded(sd, matchedFilter);
+ listener.endpointRemoved(endpoint, matchedFilter);
+ listener.endpointAdded(endpoint, matchedFilter);
break;
}
}
+ private static class Notifier implements Runnable {
+ private EndpointListener listener;
+ private String matchedFilter;
+ private EndpointDescription endpoint;
+ private int type;
+
+ /**
+ * @param listener
+ * @param matchedFilter
+ * @param endpoint
+ * @param type
+ */
+ public Notifier(EndpointListener listener, String matchedFilter, EndpointDescription endpoint, int type) {
+ super();
+ this.listener = listener;
+ this.matchedFilter = matchedFilter;
+ this.endpoint = endpoint;
+ this.type = type;
+ }
+
+ public void run() {
+ AbstractDiscoveryService.notify(listener, matchedFilter, endpoint, type);
+ }
+ }
+
+ private void triggerCallbacks(EndpointListener listener,
+ String matchedFilter,
+ EndpointDescription endpoint,
+ int type) {
+ workScheduler.scheduleWork(new Notifier(listener, matchedFilter, endpoint, type));
+
+ }
+
private boolean filterMatches(String filterValue, EndpointDescription sd) {
Filter filter = OSGiHelper.createFilter(context, filterValue);
Hashtable<String, Object> props = new Hashtable<String, Object>(sd.getProperties());
@@ -215,63 +257,25 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi
return filter != null ? filter.match(props) : false;
}
- static Collection<String> removeTracker(EndpointListener listener,
- Map<String, List<EndpointListener>> forwardMap,
- Map<EndpointListener, Collection<String>> reverseMap) {
- Collection<String> collection = reverseMap.get(listener);
- if (collection != null && !collection.isEmpty()) {
- reverseMap.remove(listener);
- Iterator<String> i = collection.iterator();
- while (i.hasNext()) {
- String element = i.next();
- if (forwardMap.containsKey(element)) {
- forwardMap.get(element).remove(listener);
- } else {
- // if the element wasn't on the forwardmap, its a new element and
- // shouldn't be returned as part of the collection of old ones
- i.remove();
- }
- }
- }
- return collection;
+ private Collection<String> removeTracker(EndpointListener listener) {
+ return listenersToFilters.remove(listener);
}
- @SuppressWarnings("unchecked")
- static Collection<String> addTracker(ServiceReference reference,
- EndpointListener listener,
- String property,
- Map<String, List<EndpointListener>> forwardMap,
- Map<EndpointListener, Collection<String>> reverseMap) {
+ private Collection<String> addTracker(ServiceReference reference, EndpointListener listener, String property) {
Collection<String> collection = OSGiHelper.getStringCollection(reference, property);
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("adding listener: " + listener
- + " collection: "
- + collection
- + " registered against prop: "
- + property);
- }
if (collection != null && !collection.isEmpty()) {
- reverseMap.put(listener, new ArrayList<String>(collection));
- Iterator<String> i = collection.iterator();
- while (i.hasNext()) {
- String element = i.next();
- if (forwardMap.containsKey(element)) {
- forwardMap.get(element).add(listener);
- } else {
- List<EndpointListener> trackerList = new ArrayList<EndpointListener>();
- trackerList.add(listener);
- forwardMap.put(element, trackerList);
- }
- }
+ listenersToFilters.put(listener, new ArrayList<String>(collection));
}
return collection;
}
- protected synchronized void endpointChanged(EndpointDescription sd, int type) {
- for (Map.Entry<EndpointListener, Collection<String>> entry : listenersToFilters.entrySet()) {
- for (String filter : entry.getValue()) {
- if (filterMatches(filter, sd)) {
- triggerCallbacks(entry.getKey(), filter, sd, type);
+ protected void endpointChanged(EndpointDescription sd, int type) {
+ synchronized (this) {
+ for (Map.Entry<EndpointListener, Collection<String>> entry : listenersToFilters.entrySet()) {
+ for (String filter : entry.getValue()) {
+ if (filterMatches(filter, sd)) {
+ triggerCallbacks(entry.getKey(), filter, sd, type);
+ }
}
}
}