diff options
Diffstat (limited to 'sca-java-2.x/trunk/modules')
7 files changed, 447 insertions, 165 deletions
diff --git a/sca-java-2.x/trunk/modules/common-java/src/main/java/org/apache/tuscany/sca/common/java/collection/CollectionMap.java b/sca-java-2.x/trunk/modules/common-java/src/main/java/org/apache/tuscany/sca/common/java/collection/CollectionMap.java index cc3fb4676d..0db80c891d 100644 --- a/sca-java-2.x/trunk/modules/common-java/src/main/java/org/apache/tuscany/sca/common/java/collection/CollectionMap.java +++ b/sca-java-2.x/trunk/modules/common-java/src/main/java/org/apache/tuscany/sca/common/java/collection/CollectionMap.java @@ -48,13 +48,28 @@ public class CollectionMap<K, V> extends ConcurrentHashMap<K, Collection<V>> { } public boolean removeValue(K key, V value) { + return removeValue(key, value, false); + } + + /** + * Remove an entry from the collection for a key + * @param key The key + * @param value The value in the collection + * @param removeEmptyEntry Indicate if the entry should be removed if the collection is empty + * @return + */ + public boolean removeValue(K key, V value, boolean removeEmptyEntry) { Collection<V> collection = get(key); if (collection == null) { return false; } - return collection.remove(value); + boolean result = collection.remove(value); + if(removeEmptyEntry && collection.isEmpty()) { + remove(key); + } + return result; } - + protected Collection<V> createCollection() { return new ArrayList<V>(); } diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointIntrospector.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointIntrospector.java index 412e119c76..ffd619a4c2 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointIntrospector.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointIntrospector.java @@ -100,21 +100,21 @@ public class EndpointIntrospector { private ServiceTracker discoveryTracker; /** - * @param intentName + * @param name * @return */ - private static QName getQName(String intentName) { + private static QName getQName(String name) { QName qname; - if (intentName.startsWith("{")) { - int i = intentName.indexOf('}'); + if (name.startsWith("{")) { + int i = name.indexOf('}'); if (i != -1) { - qname = new QName(intentName.substring(1, i), intentName.substring(i + 1)); + qname = new QName(name.substring(1, i), name.substring(i + 1)); } else { - throw new IllegalArgumentException("Invalid intent: " + intentName); + throw new IllegalArgumentException("Invalid qname: " + name); } } else { // Default to SCA namespace - qname = new QName(Base.SCA11_NS, intentName); + qname = new QName("", name); } return qname; } @@ -474,7 +474,8 @@ public class EndpointIntrospector { for (ExtenderConfiguration config : discoveryService.getConfigurations()) { for (SCAConfig sc : config.getSCAConfigs()) { for (QName bindingName : bindingNames) { - if (sc.getTargetNamespace().equals(bindingName.getNamespaceURI())) { + if ("".equals(bindingName.getNamespaceURI()) || sc.getTargetNamespace().equals(bindingName + .getNamespaceURI())) { for (Binding binding : sc.getBindings()) { if (bindingName.getLocalPart().equals(binding.getName())) { bindingMap.put(bindingName, binding); diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointMatcher.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointMatcher.java new file mode 100644 index 0000000000..b393960fd6 --- /dev/null +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointMatcher.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.osgi.remoteserviceadmin.impl; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.tuscany.sca.common.java.collection.CollectionMap; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Filter; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.hooks.service.ListenerHook.ListenerInfo; +import org.osgi.service.remoteserviceadmin.EndpointDescription; + +/** + * Matching endpoint descriptions against the sevice listeners using OSGi filiters + */ +public class EndpointMatcher { + private static final Logger logger = Logger.getLogger(EndpointMatcher.class.getName()); + private final EndpointMap endpointDescriptions = new EndpointMap(); + private final ListenerMap listeners = new ListenerMap(); + private final BundleContext context; + private final BlockingQueue<ImportAction> importQueue = new ArrayBlockingQueue<ImportAction>(256, true); + + public EndpointMatcher(BundleContext context) { + super(); + this.context = context; + } + + public static boolean matches(String filter, EndpointDescription endpointDescription) { + Filter f = null; + try { + f = FrameworkUtil.createFilter(filter); + } catch (InvalidSyntaxException e) { + throw new IllegalArgumentException(e); + } + Hashtable<String, Object> props = new Hashtable<String, Object>(endpointDescription.getProperties()); + return f.match(props); + } + + private void importEndpoint(ListenerInfo listener, EndpointDescription ep) { + ImportAction request = new ImportAction(ImportAction.Type.Add, listener, ep); + try { + importQueue.put(request); + } catch (InterruptedException e) { + throw new IllegalArgumentException(e); + } + } + + private void unimportEndpoint(ListenerInfo listener, EndpointDescription ep) { + ImportAction request = new ImportAction(ImportAction.Type.Remove, listener, ep); + try { + importQueue.put(request); + } catch (InterruptedException e) { + throw new IllegalArgumentException(e); + } + } + + public synchronized void added(ListenerInfo listener) { + String filter = listener.getFilter(); + listeners.putValue(filter, listener); + for (EndpointDescription ep : getEndpoints(filter)) { + importEndpoint(listener, ep); + } + } + + public synchronized Collection<String> added(Collection<ListenerInfo> listeners) { + for (ListenerInfo listener : listeners) { + if (accepts(listener)) { + if (!listener.isRemoved() && listener.getBundleContext().getBundle().getBundleId() != 0L) { + added(listener); + } + } + } + return getFilters(); + } + + private boolean accepts(ListenerInfo listener) { + BundleContext context = listener.getBundleContext(); + return context != null && listener.getFilter() != null && context != this.context; + } + + public synchronized void removed(ListenerInfo listener) { + String filter = listener.getFilter(); + if (accepts(listener)) + if (listeners.removeValue(filter, listener, true)) { + // Find the corresponding ImportRegistration with the listener + for (EndpointDescription ep : getEndpoints(filter)) { + unimportEndpoint(listener, ep); + } + if (getListeners(filter).isEmpty()) { + // No more listeners on the this filter, clean up the endpoint descriptionss + endpointDescriptions.remove(filter); + } + + } + } + + public synchronized Collection<String> removed(Collection<ListenerInfo> listeners) { + for (ListenerInfo listener : listeners) { + removed(listener); + } + return getFilters(); + } + + public synchronized void added(EndpointDescription endpointDescription) { + for (Map.Entry<String, Collection<ListenerInfo>> entry : listeners.entrySet()) { + if (matches(entry.getKey(), endpointDescription)) { + endpointDescriptions.putValue(entry.getKey(), endpointDescription); + for (ListenerInfo listener : entry.getValue()) { + importEndpoint(listener, endpointDescription); + } + } + } + } + + public synchronized void added(EndpointDescription endpointDescription, String matchedFilter) { + if (endpointDescriptions.putValue(matchedFilter, endpointDescription)) { + Collection<ListenerInfo> listenerInfos = listeners.get(matchedFilter); + for (ListenerInfo listener : listenerInfos) { + importEndpoint(listener, endpointDescription); + } + } + } + + public synchronized void removed(EndpointDescription endpointDescription, String matchedFilter) { + if (endpointDescriptions.removeValue(matchedFilter, endpointDescription, true)) { + for (ListenerInfo listener : getListeners(matchedFilter)) { + unimportEndpoint(listener, endpointDescription); + } + } + } + + public synchronized Set<String> getFilters() { + return new HashSet<String>(listeners.keySet()); + } + + public synchronized void clear() { + endpointDescriptions.clear(); + listeners.clear(); + importQueue.clear(); + } + + public synchronized Collection<ListenerInfo> getListeners(String filter) { + Collection<ListenerInfo> collection = listeners.get(filter); + if (collection == null) { + return Collections.emptySet(); + } else { + return collection; + } + } + + public synchronized Collection<EndpointDescription> getEndpoints(String filter) { + Collection<EndpointDescription> collection = endpointDescriptions.get(filter); + if (collection == null) { + return Collections.emptySet(); + } else { + return collection; + } + } + + public CollectionMap<Class<?>, ListenerInfo> groupListeners(EndpointDescription endpointDescription, + String matchedFilter) { + Collection<ListenerInfo> snapshot = new HashSet<ListenerInfo>(getListeners(matchedFilter)); + + // Try to partition the listeners by the interface classes + List<String> interfaceNames = endpointDescription.getInterfaces(); + CollectionMap<Class<?>, ListenerInfo> interfaceToListeners = new CollectionMap<Class<?>, ListenerInfo>(); + for (String i : interfaceNames) { + for (Iterator<ListenerInfo> it = snapshot.iterator(); it.hasNext();) { + try { + ListenerInfo listener = it.next(); + if (listener.isRemoved()) { + it.remove(); + continue; + } + if (!matchedFilter.equals(listener.getFilter())) { + continue; + } + try { + // The classloading can be synchronzed against the serviceListeners + Class<?> interfaceClass = listener.getBundleContext().getBundle().loadClass(i); + interfaceToListeners.putValue(interfaceClass, listener); + } catch (IllegalStateException e) { + logger.log(Level.WARNING, e.getMessage(), e); + // Ignore the exception + } + } catch (ClassNotFoundException e) { + // Ignore the listener as it cannot load the interface class + } + } + } + return interfaceToListeners; + } + + public BlockingQueue<ImportAction> getImportQueue() { + return importQueue; + } + + private static class ListenerMap extends CollectionMap<String, ListenerInfo> { + private static final long serialVersionUID = -8612202123531331219L; + + @Override + protected Collection<ListenerInfo> createCollection() { + return new HashSet<ListenerInfo>(); + } + } + + private static class EndpointMap extends CollectionMap<String, EndpointDescription> { + private static final long serialVersionUID = -6261405398109798549L; + + @Override + protected Collection<EndpointDescription> createCollection() { + return new HashSet<EndpointDescription>(); + } + } + + /** + * Representation of an import/unimport request + */ + public static class ImportAction { + enum Type { + Add, Remove + }; + + public final Type type; + public final ListenerInfo listenerInfo; + public final EndpointDescription endpointDescription; + + /** + * @param type + * @param listenerInfo + * @param endpointDescription + */ + public ImportAction(Type type, ListenerInfo listenerInfo, EndpointDescription endpointDescription) { + super(); + this.type = type; + this.listenerInfo = listenerInfo; + this.endpointDescription = endpointDescription; + } + } + +} diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/OSGiHelper.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/OSGiHelper.java index b0c55fea71..460d291f53 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/OSGiHelper.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/OSGiHelper.java @@ -145,20 +145,21 @@ public class OSGiHelper { } return files; } - + public static Collection<OSGiProperty> getOSGiProperties(ExtensionPointRegistry registry, ServiceReference reference) { FactoryExtensionPoint factoryExtensionPoint = registry.getExtensionPoint(FactoryExtensionPoint.class); - OSGiImplementationFactory implementationFactory= factoryExtensionPoint.getFactory(OSGiImplementationFactory.class); + OSGiImplementationFactory implementationFactory = + factoryExtensionPoint.getFactory(OSGiImplementationFactory.class); return implementationFactory.createOSGiProperties(reference); } - + public static OSGiProperty createOSGiProperty(ExtensionPointRegistry registry, String name, Object value) { FactoryExtensionPoint factoryExtensionPoint = registry.getExtensionPoint(FactoryExtensionPoint.class); - OSGiImplementationFactory implementationFactory= factoryExtensionPoint.getFactory(OSGiImplementationFactory.class); + OSGiImplementationFactory implementationFactory = + factoryExtensionPoint.getFactory(OSGiImplementationFactory.class); return implementationFactory.createOSGiProperty(name, value); } - public synchronized static String getFrameworkUUID(BundleContext bundleContext) { String uuid = null; if (bundleContext != null) { @@ -171,14 +172,15 @@ public class OSGiHelper { } System.setProperty(FRAMEWORK_UUID, uuid); return uuid; - } - + } + public static ClassLoader createBundleClassLoader(Bundle bundle) { return new BundleClassLoader(bundle); } - + private static class BundleClassLoader extends ClassLoader { private Bundle bundle; + public BundleClassLoader(Bundle bundle) { super(null); this.bundle = bundle; @@ -204,6 +206,29 @@ public class OSGiHelper { return urls; } } - } + } + + /** + * Find out what elements are added between the oldValues and newValues + * @param oldValues + * @param newValues + * @return + */ + public static Collection<String> getAddedItems(Collection<String> oldValues, Collection<String> newValues) { + if (newValues == null) { + newValues = Collections.emptySet(); + } + + Collection<String> deltaInterest = new HashSet<String>(newValues); + if (oldValues == null) { + oldValues = Collections.emptySet(); + } + deltaInterest.removeAll(oldValues); + return deltaInterest; + } + + public static Collection<String> getRemovedItems(Collection<String> oldValues, Collection<String> newValues) { + return getAddedItems(newValues, oldValues); + } } diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java index b1ebeb9df0..b6b80c8524 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java @@ -259,6 +259,10 @@ public class RemoteServiceAdminImpl implements RemoteServiceAdmin, ManagedServic props.put("objectClass", ep.getInterfaces()); props.put("service.imported.configs", ep.getConfigurationTypes()); props.put("timestamp", new Long(System.currentTimeMillis())); + Object bindings = ep.getProperties().get("org.osgi.sca.bindings"); + if (bindings != null) { + props.put("org.osgi.sca.bindings", bindings); + } props.put("event", rsaEvent); return new Event(topic, props); } diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java index 97c8b345fa..a3f3636cce 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java @@ -23,21 +23,20 @@ import static org.apache.tuscany.sca.implementation.osgi.OSGiProperty.SERVICE_EX import static org.osgi.service.remoteserviceadmin.RemoteConstants.SERVICE_EXPORTED_CONFIGS; import static org.osgi.service.remoteserviceadmin.RemoteConstants.SERVICE_IMPORTED; -import java.util.ArrayList; import java.util.Collection; import java.util.Dictionary; import java.util.HashMap; -import java.util.HashSet; import java.util.Hashtable; import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.tuscany.sca.common.java.collection.CollectionMap; import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.osgi.remoteserviceadmin.impl.EndpointMatcher.ImportAction; import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; import org.osgi.framework.Constants; @@ -67,23 +66,23 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList private BundleContext context; private ServiceTracker remoteAdmins; - private ServiceRegistration registration; + private volatile ServiceRegistration registration; private ServiceRegistration endpointListener; private ServiceTracker remotableServices; - // Service listeners keyed by the filter - private CollectionMap<String, ListenerInfo> serviceListeners = new CollectionMap<String, ListenerInfo>(); + private EndpointMatcher endpointMatcher; private CollectionMap<ServiceReference, ExportRegistration> exportedServices = new CollectionMap<ServiceReference, ExportRegistration>(); - private CollectionMap<EndpointDescription, ImportRegistration> importedServices = - new CollectionMap<EndpointDescription, ImportRegistration>(); + private CollectionMap<ImportKey, ImportRegistration> importedServices = + new CollectionMap<ImportKey, ImportRegistration>(); private Filter remotableServiceFilter; public TopologyManagerImpl(BundleContext context) { this.context = context; + this.endpointMatcher = new EndpointMatcher(context); } public void start() { @@ -113,37 +112,10 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList remotableServices = new ServiceTracker(context, remotableServiceFilter, this); remotableServices.open(true); + Thread thread = new Thread(new ImportTask()); + thread.start(); } - /** - * @see org.osgi.framework.hooks.service.EventHook#event(org.osgi.framework.ServiceEvent, - * java.util.Collection) - */ - /* - public void event(ServiceEvent event, Collection contexts) { - ServiceReference reference = event.getServiceReference(); - if (!remotableServiceFilter.match(reference)) { - // Only export remotable services that are for SCA - return; - } - switch (event.getType()) { - case ServiceEvent.REGISTERED: - exportService(reference); - break; - case ServiceEvent.UNREGISTERING: - unexportService(reference); - break; - case ServiceEvent.MODIFIED: - case ServiceEvent.MODIFIED_ENDMATCH: - // First check if the property changes will impact - // Call remote admin to update the service - unexportService(reference); - exportService(reference); - break; - } - } - */ - public Object addingService(ServiceReference reference) { exportService(reference); return reference.getBundle().getBundleContext().getService(reference); @@ -191,39 +163,13 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList * @see org.osgi.framework.hooks.service.ListenerHook#added(java.util.Collection) */ public void added(Collection listeners) { - boolean changed = false; - String[] filters = null; try { - synchronized (serviceListeners) { - Collection<ListenerInfo> listenerInfos = (Collection<ListenerInfo>)listeners; - for (ListenerInfo l : listenerInfos) { - if (l.getBundleContext().getBundle().getBundleId() == 0L || l.getBundleContext() == context) { - // Ignore system and tuscany bundle - continue; - } - if (!l.isRemoved()) { - String key = l.getFilter(); - if (key == null) { - // key = ""; - // FIXME: It should always match, let's ignore it for now - logger.warning("Service listner without a filter is skipped: " + l); - continue; - } - Collection<ListenerInfo> infos = serviceListeners.get(key); - if (infos == null) { - infos = new HashSet<ListenerInfo>(); - serviceListeners.put(key, infos); - } - infos.add(l); - changed = true; - } + synchronized (endpointMatcher) { + Collection<String> oldFilters = endpointMatcher.getFilters(); + Collection<String> newFilters = endpointMatcher.added(listeners); + if (!OSGiHelper.getAddedItems(oldFilters, newFilters).isEmpty()) { + updateEndpointListenerScope(newFilters); } - if (changed) { - filters = getFilters(); - } - } - if (changed) { - updateEndpointListenerScope(filters); } } catch (Throwable e) { logger.log(Level.SEVERE, e.getMessage(), e); @@ -238,85 +184,24 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList } } - private void updateEndpointListenerScope(String[] filters) { + private void updateEndpointListenerScope(Collection<String> filters) { Dictionary<String, Object> props = new Hashtable<String, Object>(); props.put(ENDPOINT_LISTENER_SCOPE, filters); endpointListener.setProperties(props); } - private String[] getFilters() { - Set<String> filterSet = serviceListeners.keySet(); - String[] filters = filterSet.toArray(new String[filterSet.size()]); - return filters; - } - - private CollectionMap<Class<?>, ListenerInfo> findServiceListeners(EndpointDescription endpointDescription, - String matchedFilter) { - Collection<ListenerInfo> listeners = null; - synchronized (serviceListeners) { - - // First find all the listeners that have the matching filter - listeners = serviceListeners.get(matchedFilter); - if (listeners == null) { - return null; - } - listeners = new ArrayList<ListenerInfo>(listeners); - } - - // Try to partition the listeners by the interface classes - List<String> interfaceNames = endpointDescription.getInterfaces(); - CollectionMap<Class<?>, ListenerInfo> interfaceToListeners = new CollectionMap<Class<?>, ListenerInfo>(); - for (String i : interfaceNames) { - for (Iterator<ListenerInfo> it = listeners.iterator(); it.hasNext();) { - try { - ListenerInfo listener = it.next(); - if (listener.isRemoved()) { - it.remove(); - continue; - } - try { - // The classloading can be synchronzed against the serviceListeners - Class<?> interfaceClass = listener.getBundleContext().getBundle().loadClass(i); - interfaceToListeners.putValue(interfaceClass, listener); - } catch (IllegalStateException e) { - logger.log(Level.WARNING, e.getMessage(), e); - // Ignore the exception - } - } catch (ClassNotFoundException e) { - // Ignore the listener as it cannot load the interface class - } - } - } - return interfaceToListeners; - } - /** * @see org.osgi.framework.hooks.service.ListenerHook#removed(java.util.Collection) */ public void removed(Collection listeners) { - boolean changed = false; - String[] filters = null; try { - synchronized (serviceListeners) { - Collection<ListenerInfo> listenerInfos = (Collection<ListenerInfo>)listeners; - for (ListenerInfo l : listenerInfos) { - if (registration != null && l.getBundleContext() != context) { - String key = l.getFilter(); - if (key == null) { - continue; - } - if (serviceListeners.removeValue(key, l)) { - changed = true; - } - } - } - if (changed) { - filters = getFilters(); + synchronized (endpointMatcher) { + Collection<String> oldFilters = endpointMatcher.getFilters(); + Collection<String> newFilters = endpointMatcher.removed(listeners); + if (!OSGiHelper.getRemovedItems(oldFilters, newFilters).isEmpty()) { + updateEndpointListenerScope(newFilters); } } - if (changed) { - updateEndpointListenerScope(filters); - } } catch (Throwable e) { logger.log(Level.SEVERE, e.getMessage(), e); if (e instanceof Error) { @@ -353,24 +238,27 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList * java.lang.String) */ public void endpointAdded(EndpointDescription endpoint, String matchedFilter) { - importService(endpoint, matchedFilter); + endpointMatcher.added(endpoint, matchedFilter); + // importService(endpoint, matchedFilter); } /** * @see org.osgi.remoteserviceadmin.EndpointListener#removeEndpoint(org.osgi.service.remoteserviceadmin.EndpointDescription) */ public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) { - unimportService(endpoint); + endpointMatcher.removed(endpoint, matchedFilter); + // unimportService(endpoint); } private void importService(EndpointDescription endpoint, String matchedFilter) { Object[] admins = remoteAdmins.getServices(); if (admins == null) { - logger.warning("No RemoteAdmin services are available."); + logger.warning("No Remote Service Admin services are available."); return; } - CollectionMap<Class<?>, ListenerInfo> interfaceToListeners = findServiceListeners(endpoint, matchedFilter); + CollectionMap<Class<?>, ListenerInfo> interfaceToListeners = + endpointMatcher.groupListeners(endpoint, matchedFilter); for (Map.Entry<Class<?>, Collection<ListenerInfo>> e : interfaceToListeners.entrySet()) { Class<?> interfaceClass = e.getKey(); Collection<ListenerInfo> listeners = e.getValue(); @@ -401,16 +289,17 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList RemoteServiceAdmin remoteAdmin = (RemoteServiceAdmin)ra; ImportRegistration importRegistration = remoteAdmin.importService(description); if (importRegistration != null) { - importedServices.putValue(endpoint, importRegistration); + importedServices.putValue(new ImportKey(description, listener), importRegistration); } } } } } - private void unimportService(EndpointDescription endpoint) { + private void unimportService(EndpointDescription endpoint, ListenerInfo listenerInfo) { // Call remote admin to unimport the service - Collection<ImportRegistration> importRegistrations = importedServices.get(endpoint); + Collection<ImportRegistration> importRegistrations = + importedServices.get(new ImportKey(endpoint, listenerInfo)); if (importRegistrations != null) { for (Iterator<ImportRegistration> i = importRegistrations.iterator(); i.hasNext();) { ImportRegistration imported = i.next(); @@ -435,8 +324,76 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList remoteAdmins.close(); remoteAdmins = null; } - synchronized (serviceListeners) { - serviceListeners.clear(); + if (endpointMatcher != null) { + endpointMatcher.clear(); + } + } + + private class ImportTask implements Runnable { + public void run() { + while (registration != null) { + BlockingQueue<EndpointMatcher.ImportAction> queue = endpointMatcher.getImportQueue(); + ImportAction action = null; + try { + action = queue.poll(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Ignore + } + if (action != null) { + if (action.type == ImportAction.Type.Add) { + importService(action.endpointDescription, action.listenerInfo.getFilter()); + } else if (action.type == ImportAction.Type.Remove) { + unimportService(action.endpointDescription, action.listenerInfo); + } + } + } + } + } + + private static class ImportKey { + private EndpointDescription endpointDescription; + + /** + * @param endpointDescription + * @param listenerInfo + */ + private ImportKey(EndpointDescription endpointDescription, ListenerInfo listenerInfo) { + super(); + this.endpointDescription = endpointDescription; + this.listenerInfo = listenerInfo; + } + + private ListenerInfo listenerInfo; + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((endpointDescription == null) ? 0 : endpointDescription.hashCode()); + result = prime * result + ((listenerInfo == null) ? 0 : listenerInfo.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ImportKey other = (ImportKey)obj; + if (endpointDescription == null) { + if (other.endpointDescription != null) + return false; + } else if (!endpointDescription.equals(other.endpointDescription)) + return false; + if (listenerInfo == null) { + if (other.listenerInfo != null) + return false; + } else if (!listenerInfo.equals(other.listenerInfo)) + return false; + return true; } } diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java index dbdbbaead8..e9260cef53 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java @@ -151,7 +151,6 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi EndpointListener listener = (EndpointListener)service; Collection<String> oldFilters = null; Collection<String> newFilters = null; - Collection<EndpointDescription> endpoints = null; synchronized (this) { if (logger.isLoggable(Level.FINE)) { logger.fine("updating listener: " + listener); @@ -178,6 +177,16 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi triggerCallbacks(listener, next, sd, ADDED); } } + // Find removed filters + deltaInterest = getDelta(newInterest, oldInterest); + + i = deltaInterest.iterator(); + while (i.hasNext()) { + String next = i.next(); + for (EndpointDescription sd : endpointDescriptions.keySet()) { + triggerCallbacks(listener, next, sd, REMOVED); + } + } } private Collection<String> getDelta(Collection<String> oldInterest, Collection<String> newInterest) { |