summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java
diff options
context:
space:
mode:
authorrfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68>2010-01-27 04:33:47 +0000
committerrfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68>2010-01-27 04:33:47 +0000
commit0c1527f0415fb7d6c06b0ad2a1d4ce73cd728852 (patch)
tree7d0658a2e64c4fdec2957090fc70dfd0efed09d7 /sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java
parentc2dfd0917dbb0793b7934ccc298e03187e6e3418 (diff)
Fix multiple concurrency related issues
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@903542 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to '')
-rw-r--r--sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointIntrospector.java13
-rw-r--r--sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/OSGiHelper.java9
-rw-r--r--sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java122
-rw-r--r--sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java208
-rw-r--r--sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java21
-rw-r--r--sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/LocalDiscoveryService.java1
6 files changed, 201 insertions, 173 deletions
diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointIntrospector.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointIntrospector.java
index d27a8708ee..412e119c76 100644
--- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointIntrospector.java
+++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointIntrospector.java
@@ -26,7 +26,6 @@ import static org.osgi.framework.Constants.OBJECTCLASS;
import static org.osgi.framework.Constants.SERVICE_ID;
import java.io.IOException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -267,6 +266,7 @@ public class EndpointIntrospector {
return contribution;
}
+ /*
public Contribution loadContribution(Bundle bundle, Composite composite) {
try {
URL root = bundle.getEntry("/");
@@ -277,6 +277,7 @@ public class EndpointIntrospector {
throw new ServiceRuntimeException(e);
}
}
+ */
/**
* Generate a contribution that contains the composite for the exported service
@@ -340,16 +341,16 @@ public class EndpointIntrospector {
}
// FIXME: Should we scan the owning bundle to create the SCA contribution?
- Contribution contribution = loadContribution(bundle, composite);
+ Contribution contribution = loadContribution(bundle, id, composite);
return contribution;
}
- private Contribution createContribution(Bundle bundle, String id, Composite composite) {
+ private Contribution loadContribution(Bundle bundle, String id, Composite composite) {
Contribution contribution = contributionFactory.createContribution();
contribution.setClassLoader(OSGiHelper.createBundleClassLoader(bundle));
- contribution.setURI("urn:" + id);
+ contribution.setURI(id);
contribution.setLocation(bundle.getEntry("/").toString());
- contribution.getDeployables().add(composite);
+ deployer.attachDeploymentComposite(contribution, composite, false);
ModelResolver modelResolver = new ExtensibleModelResolver(contribution, modelResolvers, factories);
contribution.setModelResolver(modelResolver);
// compositeProcessor.resolve(composite, modelResolver, new ProcessorContext(registry));
@@ -433,7 +434,7 @@ public class EndpointIntrospector {
componentReference.getBindings().addAll(bindings);
}
- Contribution contribution = loadContribution(bundle, composite);
+ Contribution contribution = loadContribution(bundle, id, composite);
return contribution;
}
diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/OSGiHelper.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/OSGiHelper.java
index a4b51d9d0c..b0c55fea71 100644
--- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/OSGiHelper.java
+++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/OSGiHelper.java
@@ -109,8 +109,9 @@ public class OSGiHelper {
if (value == null) {
return Collections.emptyList();
}
- String paths[] = value.trim().split("( |\t|\n|\r|\f|,)+");
- if (paths.length == 0) {
+ value = value.trim();
+ String paths[] = value.split("( |\t|\n|\r|\f|,)+");
+ if ("".equals(value) || paths.length == 0) {
if (defaultValue != null) {
paths = new String[] {defaultValue};
} else {
@@ -119,6 +120,10 @@ public class OSGiHelper {
}
Collection<URL> files = new HashSet<URL>();
for (String path : paths) {
+ if ("".equals(path)) {
+ // Skip empty ones
+ continue;
+ }
if (path.endsWith("/")) {
path = path + "*.xml";
}
diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java
index 33faa77c84..97c8b345fa 100644
--- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java
+++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java
@@ -23,6 +23,7 @@ import static org.apache.tuscany.sca.implementation.osgi.OSGiProperty.SERVICE_EX
import static org.osgi.service.remoteserviceadmin.RemoteConstants.SERVICE_EXPORTED_CONFIGS;
import static org.osgi.service.remoteserviceadmin.RemoteConstants.SERVICE_IMPORTED;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Dictionary;
import java.util.HashMap;
@@ -190,10 +191,11 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList
* @see org.osgi.framework.hooks.service.ListenerHook#added(java.util.Collection)
*/
public void added(Collection listeners) {
- synchronized (serviceListeners) {
- try {
+ boolean changed = false;
+ String[] filters = null;
+ try {
+ synchronized (serviceListeners) {
Collection<ListenerInfo> listenerInfos = (Collection<ListenerInfo>)listeners;
- boolean changed = false;
for (ListenerInfo l : listenerInfos) {
if (l.getBundleContext().getBundle().getBundleId() == 0L || l.getBundleContext() == context) {
// Ignore system and tuscany bundle
@@ -217,75 +219,86 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList
}
}
if (changed) {
- updateEndpointListenerScope();
- }
- } catch (Throwable e) {
- logger.log(Level.SEVERE, e.getMessage(), e);
- if (e instanceof Error) {
- throw (Error)e;
- } else if (e instanceof RuntimeException) {
- throw (RuntimeException)e;
- } else {
- // Should not happen
- throw new RuntimeException(e);
+ filters = getFilters();
}
}
+ if (changed) {
+ updateEndpointListenerScope(filters);
+ }
+ } catch (Throwable e) {
+ logger.log(Level.SEVERE, e.getMessage(), e);
+ if (e instanceof Error) {
+ throw (Error)e;
+ } else if (e instanceof RuntimeException) {
+ throw (RuntimeException)e;
+ } else {
+ // Should not happen
+ throw new RuntimeException(e);
+ }
}
}
- private void updateEndpointListenerScope() {
- Set<String> filterSet = serviceListeners.keySet();
-
+ private void updateEndpointListenerScope(String[] filters) {
Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(ENDPOINT_LISTENER_SCOPE, filterSet.toArray(new String[filterSet.size()]));
+ props.put(ENDPOINT_LISTENER_SCOPE, filters);
endpointListener.setProperties(props);
}
+ private String[] getFilters() {
+ Set<String> filterSet = serviceListeners.keySet();
+ String[] filters = filterSet.toArray(new String[filterSet.size()]);
+ return filters;
+ }
+
private CollectionMap<Class<?>, ListenerInfo> findServiceListeners(EndpointDescription endpointDescription,
String matchedFilter) {
+ Collection<ListenerInfo> listeners = null;
synchronized (serviceListeners) {
// First find all the listeners that have the matching filter
- Collection<ListenerInfo> listeners = serviceListeners.get(matchedFilter);
+ listeners = serviceListeners.get(matchedFilter);
if (listeners == null) {
return null;
}
+ listeners = new ArrayList<ListenerInfo>(listeners);
+ }
- // Try to partition the listeners by the interface classes
- List<String> interfaceNames = endpointDescription.getInterfaces();
- CollectionMap<Class<?>, ListenerInfo> interfaceToListeners = new CollectionMap<Class<?>, ListenerInfo>();
- for (String i : interfaceNames) {
- for (Iterator<ListenerInfo> it = listeners.iterator(); it.hasNext();) {
+ // Try to partition the listeners by the interface classes
+ List<String> interfaceNames = endpointDescription.getInterfaces();
+ CollectionMap<Class<?>, ListenerInfo> interfaceToListeners = new CollectionMap<Class<?>, ListenerInfo>();
+ for (String i : interfaceNames) {
+ for (Iterator<ListenerInfo> it = listeners.iterator(); it.hasNext();) {
+ try {
+ ListenerInfo listener = it.next();
+ if (listener.isRemoved()) {
+ it.remove();
+ continue;
+ }
try {
- ListenerInfo listener = it.next();
- if (listener.isRemoved()) {
- it.remove();
- continue;
- }
- try {
- Class<?> interfaceClass = listener.getBundleContext().getBundle().loadClass(i);
- interfaceToListeners.putValue(interfaceClass, listener);
- } catch (IllegalStateException e) {
- logger.log(Level.WARNING, e.getMessage(), e);
- // Ignore the exception
- }
- } catch (ClassNotFoundException e) {
- // Ignore the listener as it cannot load the interface class
+ // The classloading can be synchronzed against the serviceListeners
+ Class<?> interfaceClass = listener.getBundleContext().getBundle().loadClass(i);
+ interfaceToListeners.putValue(interfaceClass, listener);
+ } catch (IllegalStateException e) {
+ logger.log(Level.WARNING, e.getMessage(), e);
+ // Ignore the exception
}
+ } catch (ClassNotFoundException e) {
+ // Ignore the listener as it cannot load the interface class
}
}
- return interfaceToListeners;
}
+ return interfaceToListeners;
}
/**
* @see org.osgi.framework.hooks.service.ListenerHook#removed(java.util.Collection)
*/
public void removed(Collection listeners) {
- synchronized (serviceListeners) {
- try {
+ boolean changed = false;
+ String[] filters = null;
+ try {
+ synchronized (serviceListeners) {
Collection<ListenerInfo> listenerInfos = (Collection<ListenerInfo>)listeners;
- boolean changed = false;
for (ListenerInfo l : listenerInfos) {
if (registration != null && l.getBundleContext() != context) {
String key = l.getFilter();
@@ -298,19 +311,22 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList
}
}
if (changed) {
- updateEndpointListenerScope();
- }
- } catch (Throwable e) {
- logger.log(Level.SEVERE, e.getMessage(), e);
- if (e instanceof Error) {
- throw (Error)e;
- } else if (e instanceof RuntimeException) {
- throw (RuntimeException)e;
- } else {
- // Should not happen
- throw new RuntimeException(e);
+ filters = getFilters();
}
}
+ if (changed) {
+ updateEndpointListenerScope(filters);
+ }
+ } catch (Throwable e) {
+ logger.log(Level.SEVERE, e.getMessage(), e);
+ if (e instanceof Error) {
+ throw (Error)e;
+ } else if (e instanceof RuntimeException) {
+ throw (RuntimeException)e;
+ } else {
+ // Should not happen
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java
index 886e79197d..dbdbbaead8 100644
--- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java
+++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java
@@ -19,15 +19,12 @@
package org.apache.tuscany.sca.osgi.service.discovery.impl;
-import static org.osgi.service.remoteserviceadmin.EndpointListener.ENDPOINT_LISTENER_SCOPE;
-
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Dictionary;
-import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
@@ -35,9 +32,11 @@ import java.util.logging.Logger;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.LifeCycleListener;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
import org.apache.tuscany.sca.node.NodeFactory;
import org.apache.tuscany.sca.node.impl.NodeFactoryImpl;
import org.apache.tuscany.sca.osgi.remoteserviceadmin.impl.OSGiHelper;
+import org.apache.tuscany.sca.work.WorkScheduler;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
@@ -60,13 +59,13 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi
protected BundleContext context;
protected ExtensionPointRegistry registry;
+ private WorkScheduler workScheduler;
- private Map<String, List<EndpointListener>> filtersToListeners = new HashMap<String, List<EndpointListener>>();
- // this is effectively a set which allows for multiple service descriptions with the
- // same interface name but different properties and takes care of itself with respect to concurrency
- protected Map<EndpointDescription, Bundle> endpointDescriptions = new ConcurrentHashMap<EndpointDescription, Bundle>();
private Map<EndpointListener, Collection<String>> listenersToFilters =
- new HashMap<EndpointListener, Collection<String>>();
+ new ConcurrentHashMap<EndpointListener, Collection<String>>();
+
+ protected Map<EndpointDescription, Bundle> endpointDescriptions =
+ new ConcurrentHashMap<EndpointDescription, Bundle>();
private ServiceTracker trackerTracker;
public AbstractDiscoveryService(BundleContext context) {
@@ -75,6 +74,10 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi
}
public void start() {
+ getExtensionPointRegistry();
+ UtilityExtensionPoint utilityExtensionPoint = registry.getExtensionPoint(UtilityExtensionPoint.class);
+ this.workScheduler = utilityExtensionPoint.getUtility(WorkScheduler.class);
+
// track the registration of EndpointListener
trackerTracker = new ServiceTracker(this.context, EndpointListener.class.getName(), null) {
public Object addingService(ServiceReference reference) {
@@ -122,66 +125,52 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi
return props;
}
- private synchronized void cacheTracker(ServiceReference reference, Object service) {
+ private void cacheTracker(ServiceReference reference, Object service) {
if (service instanceof EndpointListener) {
EndpointListener listener = (EndpointListener)service;
- Collection<String> filters =
- addTracker(reference, listener, ENDPOINT_LISTENER_SCOPE, filtersToListeners, listenersToFilters);
-
- triggerCallbacks(null, filters, listener, true);
+ Collection<String> filters = null;
+ Collection<EndpointDescription> endpoints = null;
+ synchronized (this) {
+ filters = addTracker(reference, listener, EndpointListener.ENDPOINT_LISTENER_SCOPE);
+ // Take a snapshot of the endpoints
+ triggerCallbacks(null, filters, listener);
+ }
}
}
- private synchronized void clearTracker(Object service) {
+ private void clearTracker(Object service) {
if (service instanceof EndpointListener) {
- removeTracker((EndpointListener)service, filtersToListeners, listenersToFilters);
+ synchronized (this) {
+ removeTracker((EndpointListener)service);
+ }
}
}
- private synchronized void updateTracker(ServiceReference reference, Object service) {
+ private void updateTracker(ServiceReference reference, Object service) {
if (service instanceof EndpointListener) {
EndpointListener listener = (EndpointListener)service;
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("updating listener: " + listener);
+ Collection<String> oldFilters = null;
+ Collection<String> newFilters = null;
+ Collection<EndpointDescription> endpoints = null;
+ synchronized (this) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("updating listener: " + listener);
+ }
+ oldFilters = removeTracker(listener);
+ newFilters = addTracker(reference, listener, EndpointListener.ENDPOINT_LISTENER_SCOPE);
+ triggerCallbacks(oldFilters, newFilters, listener);
}
- Collection<String> oldFilters = removeTracker(listener, filtersToListeners, listenersToFilters);
-
- Collection<String> newFilters =
- addTracker(reference, listener, ENDPOINT_LISTENER_SCOPE, filtersToListeners, listenersToFilters);
-
- triggerCallbacks(oldFilters, newFilters, listener, true);
}
}
private void triggerCallbacks(Collection<String> oldInterest,
Collection<String> newInterest,
- EndpointListener listener,
- boolean isFilter) {
+ EndpointListener listener) {
// compute delta between old & new interfaces/filters and
// trigger callbacks for any entries in servicesInfo that
// match any *additional* interface/filters
- Collection<String> deltaInterest = new ArrayList<String>();
- if (newInterest != null && !newInterest.isEmpty()) {
- if (oldInterest == null || oldInterest.isEmpty()) {
- deltaInterest.addAll(newInterest);
- } else {
- Iterator<String> i = newInterest.iterator();
- while (i.hasNext()) {
- String next = (String)i.next();
- if (!oldInterest.contains(next)) {
- deltaInterest.add(next);
- }
- }
- }
- }
+ Collection<String> deltaInterest = getDelta(oldInterest, newInterest);
- if (logger.isLoggable(Level.FINE)) {
- if (endpointDescriptions.size() > 0) {
- logger.fine("search for matches to trigger callbacks with delta: " + deltaInterest);
- } else {
- logger.fine("nothing to search for matches to trigger callbacks with delta: " + deltaInterest);
- }
- }
Iterator<String> i = deltaInterest.iterator();
while (i.hasNext()) {
String next = i.next();
@@ -191,21 +180,74 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi
}
}
- private void triggerCallbacks(EndpointListener listener, String matchedFilter, EndpointDescription sd, int type) {
+ private Collection<String> getDelta(Collection<String> oldInterest, Collection<String> newInterest) {
+ if (newInterest == null) {
+ newInterest = Collections.emptySet();
+ }
+
+ Collection<String> deltaInterest = new ArrayList<String>(newInterest);
+ if (oldInterest == null) {
+ oldInterest = Collections.emptySet();
+ }
+ deltaInterest.removeAll(oldInterest);
+ return deltaInterest;
+ }
+
+ /**
+ * Notify the endpoint listener
+ * @param listener
+ * @param matchedFilter
+ * @param endpoint
+ * @param type
+ */
+ private static void notify(EndpointListener listener, String matchedFilter, EndpointDescription endpoint, int type) {
switch (type) {
case ADDED:
- listener.endpointAdded(sd, matchedFilter);
+ listener.endpointAdded(endpoint, matchedFilter);
break;
case REMOVED:
- listener.endpointRemoved(sd, matchedFilter);
+ listener.endpointRemoved(endpoint, matchedFilter);
break;
case MODIFIED:
- listener.endpointRemoved(sd, matchedFilter);
- listener.endpointAdded(sd, matchedFilter);
+ listener.endpointRemoved(endpoint, matchedFilter);
+ listener.endpointAdded(endpoint, matchedFilter);
break;
}
}
+ private static class Notifier implements Runnable {
+ private EndpointListener listener;
+ private String matchedFilter;
+ private EndpointDescription endpoint;
+ private int type;
+
+ /**
+ * @param listener
+ * @param matchedFilter
+ * @param endpoint
+ * @param type
+ */
+ public Notifier(EndpointListener listener, String matchedFilter, EndpointDescription endpoint, int type) {
+ super();
+ this.listener = listener;
+ this.matchedFilter = matchedFilter;
+ this.endpoint = endpoint;
+ this.type = type;
+ }
+
+ public void run() {
+ AbstractDiscoveryService.notify(listener, matchedFilter, endpoint, type);
+ }
+ }
+
+ private void triggerCallbacks(EndpointListener listener,
+ String matchedFilter,
+ EndpointDescription endpoint,
+ int type) {
+ workScheduler.scheduleWork(new Notifier(listener, matchedFilter, endpoint, type));
+
+ }
+
private boolean filterMatches(String filterValue, EndpointDescription sd) {
Filter filter = OSGiHelper.createFilter(context, filterValue);
Hashtable<String, Object> props = new Hashtable<String, Object>(sd.getProperties());
@@ -215,63 +257,25 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi
return filter != null ? filter.match(props) : false;
}
- static Collection<String> removeTracker(EndpointListener listener,
- Map<String, List<EndpointListener>> forwardMap,
- Map<EndpointListener, Collection<String>> reverseMap) {
- Collection<String> collection = reverseMap.get(listener);
- if (collection != null && !collection.isEmpty()) {
- reverseMap.remove(listener);
- Iterator<String> i = collection.iterator();
- while (i.hasNext()) {
- String element = i.next();
- if (forwardMap.containsKey(element)) {
- forwardMap.get(element).remove(listener);
- } else {
- // if the element wasn't on the forwardmap, its a new element and
- // shouldn't be returned as part of the collection of old ones
- i.remove();
- }
- }
- }
- return collection;
+ private Collection<String> removeTracker(EndpointListener listener) {
+ return listenersToFilters.remove(listener);
}
- @SuppressWarnings("unchecked")
- static Collection<String> addTracker(ServiceReference reference,
- EndpointListener listener,
- String property,
- Map<String, List<EndpointListener>> forwardMap,
- Map<EndpointListener, Collection<String>> reverseMap) {
+ private Collection<String> addTracker(ServiceReference reference, EndpointListener listener, String property) {
Collection<String> collection = OSGiHelper.getStringCollection(reference, property);
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("adding listener: " + listener
- + " collection: "
- + collection
- + " registered against prop: "
- + property);
- }
if (collection != null && !collection.isEmpty()) {
- reverseMap.put(listener, new ArrayList<String>(collection));
- Iterator<String> i = collection.iterator();
- while (i.hasNext()) {
- String element = i.next();
- if (forwardMap.containsKey(element)) {
- forwardMap.get(element).add(listener);
- } else {
- List<EndpointListener> trackerList = new ArrayList<EndpointListener>();
- trackerList.add(listener);
- forwardMap.put(element, trackerList);
- }
- }
+ listenersToFilters.put(listener, new ArrayList<String>(collection));
}
return collection;
}
- protected synchronized void endpointChanged(EndpointDescription sd, int type) {
- for (Map.Entry<EndpointListener, Collection<String>> entry : listenersToFilters.entrySet()) {
- for (String filter : entry.getValue()) {
- if (filterMatches(filter, sd)) {
- triggerCallbacks(entry.getKey(), filter, sd, type);
+ protected void endpointChanged(EndpointDescription sd, int type) {
+ synchronized (this) {
+ for (Map.Entry<EndpointListener, Collection<String>> entry : listenersToFilters.entrySet()) {
+ for (String filter : entry.getValue()) {
+ if (filterMatches(filter, sd)) {
+ triggerCallbacks(entry.getKey(), filter, sd, type);
+ }
}
}
}
diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java
index a02be672a2..c9c4cb6a69 100644
--- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java
+++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java
@@ -49,7 +49,6 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements
public void start() {
super.start();
- getExtensionPointRegistry();
this.domainRegistryFactory =
registry.getExtensionPoint(UtilityExtensionPoint.class).getUtility(DomainRegistryFactory.class);
domainRegistryFactory.addListener(this);
@@ -80,7 +79,7 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements
}
}
- public synchronized void endpointAdded(Endpoint endpoint) {
+ public void endpointAdded(Endpoint endpoint) {
Implementation impl = endpoint.getComponent().getImplementation();
if (!(impl instanceof OSGiImplementation)) {
return;
@@ -94,20 +93,24 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements
bundleContext = bundle != null ? bundle.getBundleContext() : null;
}
- // Notify the endpoint listeners
- EndpointDescription description = createEndpointDescription(bundleContext, endpoint);
- // Set the owning bundle to runtime bundle to avoid NPE
+ // Notify the endpoint listeners
+ EndpointDescription description = createEndpointDescription(bundleContext, endpoint);
+ // Set the owning bundle to runtime bundle to avoid NPE
+ synchronized (this) {
endpointDescriptions.put(description, context.getBundle());
endpointChanged(description, ADDED);
+ }
}
- public synchronized void endpointRemoved(Endpoint endpoint) {
- EndpointDescription description = createEndpointDescription(context, endpoint);
+ public void endpointRemoved(Endpoint endpoint) {
+ EndpointDescription description = createEndpointDescription(context, endpoint);
+ synchronized (this) {
endpointDescriptions.remove(description);
endpointChanged(description, REMOVED);
+ }
}
- public synchronized void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) {
+ public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) {
// FIXME: This is a quick and dirty way for the update
endpointRemoved(oldEndpoint);
endpointAdded(newEndpoint);
@@ -124,7 +127,7 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements
super.stop();
}
}
-
+
@Override
protected Dictionary<String, Object> getProperties() {
Dictionary<String, Object> props = super.getProperties();
diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/LocalDiscoveryService.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/LocalDiscoveryService.java
index 56a830a1e6..a31d05acf0 100644
--- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/LocalDiscoveryService.java
+++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/LocalDiscoveryService.java
@@ -63,7 +63,6 @@ public class LocalDiscoveryService extends AbstractDiscoveryService implements B
public void start() {
super.start();
- getExtensionPointRegistry();
UtilityExtensionPoint utilities = this.registry.getExtensionPoint(UtilityExtensionPoint.class);
this.deployer = utilities.getUtility(Deployer.class);