From 5cd7355e62c22cd5d342082b7c89aa569ad3e6b3 Mon Sep 17 00:00:00 2001 From: rfeng Date: Tue, 2 Feb 2010 06:53:38 +0000 Subject: Improve the matching between endpoints and listeners to pass the compliance tests git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@905532 13f79535-47bb-0310-9956-ffa450edef68 --- .../impl/EndpointIntrospector.java | 17 +- .../remoteserviceadmin/impl/EndpointMatcher.java | 271 +++++++++++++++++++++ .../osgi/remoteserviceadmin/impl/OSGiHelper.java | 43 +++- .../impl/RemoteServiceAdminImpl.java | 4 + .../impl/TopologyManagerImpl.java | 247 ++++++++----------- .../discovery/impl/AbstractDiscoveryService.java | 11 +- 6 files changed, 430 insertions(+), 163 deletions(-) create mode 100644 sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointMatcher.java (limited to 'sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org') diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointIntrospector.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointIntrospector.java index 412e119c76..ffd619a4c2 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointIntrospector.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointIntrospector.java @@ -100,21 +100,21 @@ public class EndpointIntrospector { private ServiceTracker discoveryTracker; /** - * @param intentName + * @param name * @return */ - private static QName getQName(String intentName) { + private static QName getQName(String name) { QName qname; - if (intentName.startsWith("{")) { - int i = intentName.indexOf('}'); + if (name.startsWith("{")) { + int i = name.indexOf('}'); if (i != -1) { - qname = new QName(intentName.substring(1, i), intentName.substring(i + 1)); + qname = new QName(name.substring(1, i), name.substring(i + 1)); } else { - throw new IllegalArgumentException("Invalid intent: " + intentName); + throw new IllegalArgumentException("Invalid qname: " + name); } } else { // Default to SCA namespace - qname = new QName(Base.SCA11_NS, intentName); + qname = new QName("", name); } return qname; } @@ -474,7 +474,8 @@ public class EndpointIntrospector { for (ExtenderConfiguration config : discoveryService.getConfigurations()) { for (SCAConfig sc : config.getSCAConfigs()) { for (QName bindingName : bindingNames) { - if (sc.getTargetNamespace().equals(bindingName.getNamespaceURI())) { + if ("".equals(bindingName.getNamespaceURI()) || sc.getTargetNamespace().equals(bindingName + .getNamespaceURI())) { for (Binding binding : sc.getBindings()) { if (bindingName.getLocalPart().equals(binding.getName())) { bindingMap.put(bindingName, binding); diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointMatcher.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointMatcher.java new file mode 100644 index 0000000000..b393960fd6 --- /dev/null +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointMatcher.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.osgi.remoteserviceadmin.impl; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.tuscany.sca.common.java.collection.CollectionMap; +import org.osgi.framework.BundleContext; +import org.osgi.framework.Filter; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.hooks.service.ListenerHook.ListenerInfo; +import org.osgi.service.remoteserviceadmin.EndpointDescription; + +/** + * Matching endpoint descriptions against the sevice listeners using OSGi filiters + */ +public class EndpointMatcher { + private static final Logger logger = Logger.getLogger(EndpointMatcher.class.getName()); + private final EndpointMap endpointDescriptions = new EndpointMap(); + private final ListenerMap listeners = new ListenerMap(); + private final BundleContext context; + private final BlockingQueue importQueue = new ArrayBlockingQueue(256, true); + + public EndpointMatcher(BundleContext context) { + super(); + this.context = context; + } + + public static boolean matches(String filter, EndpointDescription endpointDescription) { + Filter f = null; + try { + f = FrameworkUtil.createFilter(filter); + } catch (InvalidSyntaxException e) { + throw new IllegalArgumentException(e); + } + Hashtable props = new Hashtable(endpointDescription.getProperties()); + return f.match(props); + } + + private void importEndpoint(ListenerInfo listener, EndpointDescription ep) { + ImportAction request = new ImportAction(ImportAction.Type.Add, listener, ep); + try { + importQueue.put(request); + } catch (InterruptedException e) { + throw new IllegalArgumentException(e); + } + } + + private void unimportEndpoint(ListenerInfo listener, EndpointDescription ep) { + ImportAction request = new ImportAction(ImportAction.Type.Remove, listener, ep); + try { + importQueue.put(request); + } catch (InterruptedException e) { + throw new IllegalArgumentException(e); + } + } + + public synchronized void added(ListenerInfo listener) { + String filter = listener.getFilter(); + listeners.putValue(filter, listener); + for (EndpointDescription ep : getEndpoints(filter)) { + importEndpoint(listener, ep); + } + } + + public synchronized Collection added(Collection listeners) { + for (ListenerInfo listener : listeners) { + if (accepts(listener)) { + if (!listener.isRemoved() && listener.getBundleContext().getBundle().getBundleId() != 0L) { + added(listener); + } + } + } + return getFilters(); + } + + private boolean accepts(ListenerInfo listener) { + BundleContext context = listener.getBundleContext(); + return context != null && listener.getFilter() != null && context != this.context; + } + + public synchronized void removed(ListenerInfo listener) { + String filter = listener.getFilter(); + if (accepts(listener)) + if (listeners.removeValue(filter, listener, true)) { + // Find the corresponding ImportRegistration with the listener + for (EndpointDescription ep : getEndpoints(filter)) { + unimportEndpoint(listener, ep); + } + if (getListeners(filter).isEmpty()) { + // No more listeners on the this filter, clean up the endpoint descriptionss + endpointDescriptions.remove(filter); + } + + } + } + + public synchronized Collection removed(Collection listeners) { + for (ListenerInfo listener : listeners) { + removed(listener); + } + return getFilters(); + } + + public synchronized void added(EndpointDescription endpointDescription) { + for (Map.Entry> entry : listeners.entrySet()) { + if (matches(entry.getKey(), endpointDescription)) { + endpointDescriptions.putValue(entry.getKey(), endpointDescription); + for (ListenerInfo listener : entry.getValue()) { + importEndpoint(listener, endpointDescription); + } + } + } + } + + public synchronized void added(EndpointDescription endpointDescription, String matchedFilter) { + if (endpointDescriptions.putValue(matchedFilter, endpointDescription)) { + Collection listenerInfos = listeners.get(matchedFilter); + for (ListenerInfo listener : listenerInfos) { + importEndpoint(listener, endpointDescription); + } + } + } + + public synchronized void removed(EndpointDescription endpointDescription, String matchedFilter) { + if (endpointDescriptions.removeValue(matchedFilter, endpointDescription, true)) { + for (ListenerInfo listener : getListeners(matchedFilter)) { + unimportEndpoint(listener, endpointDescription); + } + } + } + + public synchronized Set getFilters() { + return new HashSet(listeners.keySet()); + } + + public synchronized void clear() { + endpointDescriptions.clear(); + listeners.clear(); + importQueue.clear(); + } + + public synchronized Collection getListeners(String filter) { + Collection collection = listeners.get(filter); + if (collection == null) { + return Collections.emptySet(); + } else { + return collection; + } + } + + public synchronized Collection getEndpoints(String filter) { + Collection collection = endpointDescriptions.get(filter); + if (collection == null) { + return Collections.emptySet(); + } else { + return collection; + } + } + + public CollectionMap, ListenerInfo> groupListeners(EndpointDescription endpointDescription, + String matchedFilter) { + Collection snapshot = new HashSet(getListeners(matchedFilter)); + + // Try to partition the listeners by the interface classes + List interfaceNames = endpointDescription.getInterfaces(); + CollectionMap, ListenerInfo> interfaceToListeners = new CollectionMap, ListenerInfo>(); + for (String i : interfaceNames) { + for (Iterator it = snapshot.iterator(); it.hasNext();) { + try { + ListenerInfo listener = it.next(); + if (listener.isRemoved()) { + it.remove(); + continue; + } + if (!matchedFilter.equals(listener.getFilter())) { + continue; + } + try { + // The classloading can be synchronzed against the serviceListeners + Class interfaceClass = listener.getBundleContext().getBundle().loadClass(i); + interfaceToListeners.putValue(interfaceClass, listener); + } catch (IllegalStateException e) { + logger.log(Level.WARNING, e.getMessage(), e); + // Ignore the exception + } + } catch (ClassNotFoundException e) { + // Ignore the listener as it cannot load the interface class + } + } + } + return interfaceToListeners; + } + + public BlockingQueue getImportQueue() { + return importQueue; + } + + private static class ListenerMap extends CollectionMap { + private static final long serialVersionUID = -8612202123531331219L; + + @Override + protected Collection createCollection() { + return new HashSet(); + } + } + + private static class EndpointMap extends CollectionMap { + private static final long serialVersionUID = -6261405398109798549L; + + @Override + protected Collection createCollection() { + return new HashSet(); + } + } + + /** + * Representation of an import/unimport request + */ + public static class ImportAction { + enum Type { + Add, Remove + }; + + public final Type type; + public final ListenerInfo listenerInfo; + public final EndpointDescription endpointDescription; + + /** + * @param type + * @param listenerInfo + * @param endpointDescription + */ + public ImportAction(Type type, ListenerInfo listenerInfo, EndpointDescription endpointDescription) { + super(); + this.type = type; + this.listenerInfo = listenerInfo; + this.endpointDescription = endpointDescription; + } + } + +} diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/OSGiHelper.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/OSGiHelper.java index b0c55fea71..460d291f53 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/OSGiHelper.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/OSGiHelper.java @@ -145,20 +145,21 @@ public class OSGiHelper { } return files; } - + public static Collection getOSGiProperties(ExtensionPointRegistry registry, ServiceReference reference) { FactoryExtensionPoint factoryExtensionPoint = registry.getExtensionPoint(FactoryExtensionPoint.class); - OSGiImplementationFactory implementationFactory= factoryExtensionPoint.getFactory(OSGiImplementationFactory.class); + OSGiImplementationFactory implementationFactory = + factoryExtensionPoint.getFactory(OSGiImplementationFactory.class); return implementationFactory.createOSGiProperties(reference); } - + public static OSGiProperty createOSGiProperty(ExtensionPointRegistry registry, String name, Object value) { FactoryExtensionPoint factoryExtensionPoint = registry.getExtensionPoint(FactoryExtensionPoint.class); - OSGiImplementationFactory implementationFactory= factoryExtensionPoint.getFactory(OSGiImplementationFactory.class); + OSGiImplementationFactory implementationFactory = + factoryExtensionPoint.getFactory(OSGiImplementationFactory.class); return implementationFactory.createOSGiProperty(name, value); } - public synchronized static String getFrameworkUUID(BundleContext bundleContext) { String uuid = null; if (bundleContext != null) { @@ -171,14 +172,15 @@ public class OSGiHelper { } System.setProperty(FRAMEWORK_UUID, uuid); return uuid; - } - + } + public static ClassLoader createBundleClassLoader(Bundle bundle) { return new BundleClassLoader(bundle); } - + private static class BundleClassLoader extends ClassLoader { private Bundle bundle; + public BundleClassLoader(Bundle bundle) { super(null); this.bundle = bundle; @@ -204,6 +206,29 @@ public class OSGiHelper { return urls; } } - } + } + + /** + * Find out what elements are added between the oldValues and newValues + * @param oldValues + * @param newValues + * @return + */ + public static Collection getAddedItems(Collection oldValues, Collection newValues) { + if (newValues == null) { + newValues = Collections.emptySet(); + } + + Collection deltaInterest = new HashSet(newValues); + if (oldValues == null) { + oldValues = Collections.emptySet(); + } + deltaInterest.removeAll(oldValues); + return deltaInterest; + } + + public static Collection getRemovedItems(Collection oldValues, Collection newValues) { + return getAddedItems(newValues, oldValues); + } } diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java index b1ebeb9df0..b6b80c8524 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java @@ -259,6 +259,10 @@ public class RemoteServiceAdminImpl implements RemoteServiceAdmin, ManagedServic props.put("objectClass", ep.getInterfaces()); props.put("service.imported.configs", ep.getConfigurationTypes()); props.put("timestamp", new Long(System.currentTimeMillis())); + Object bindings = ep.getProperties().get("org.osgi.sca.bindings"); + if (bindings != null) { + props.put("org.osgi.sca.bindings", bindings); + } props.put("event", rsaEvent); return new Event(topic, props); } diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java index 97c8b345fa..a3f3636cce 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java @@ -23,21 +23,20 @@ import static org.apache.tuscany.sca.implementation.osgi.OSGiProperty.SERVICE_EX import static org.osgi.service.remoteserviceadmin.RemoteConstants.SERVICE_EXPORTED_CONFIGS; import static org.osgi.service.remoteserviceadmin.RemoteConstants.SERVICE_IMPORTED; -import java.util.ArrayList; import java.util.Collection; import java.util.Dictionary; import java.util.HashMap; -import java.util.HashSet; import java.util.Hashtable; import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.tuscany.sca.common.java.collection.CollectionMap; import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.osgi.remoteserviceadmin.impl.EndpointMatcher.ImportAction; import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; import org.osgi.framework.Constants; @@ -67,23 +66,23 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList private BundleContext context; private ServiceTracker remoteAdmins; - private ServiceRegistration registration; + private volatile ServiceRegistration registration; private ServiceRegistration endpointListener; private ServiceTracker remotableServices; - // Service listeners keyed by the filter - private CollectionMap serviceListeners = new CollectionMap(); + private EndpointMatcher endpointMatcher; private CollectionMap exportedServices = new CollectionMap(); - private CollectionMap importedServices = - new CollectionMap(); + private CollectionMap importedServices = + new CollectionMap(); private Filter remotableServiceFilter; public TopologyManagerImpl(BundleContext context) { this.context = context; + this.endpointMatcher = new EndpointMatcher(context); } public void start() { @@ -113,37 +112,10 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList remotableServices = new ServiceTracker(context, remotableServiceFilter, this); remotableServices.open(true); + Thread thread = new Thread(new ImportTask()); + thread.start(); } - /** - * @see org.osgi.framework.hooks.service.EventHook#event(org.osgi.framework.ServiceEvent, - * java.util.Collection) - */ - /* - public void event(ServiceEvent event, Collection contexts) { - ServiceReference reference = event.getServiceReference(); - if (!remotableServiceFilter.match(reference)) { - // Only export remotable services that are for SCA - return; - } - switch (event.getType()) { - case ServiceEvent.REGISTERED: - exportService(reference); - break; - case ServiceEvent.UNREGISTERING: - unexportService(reference); - break; - case ServiceEvent.MODIFIED: - case ServiceEvent.MODIFIED_ENDMATCH: - // First check if the property changes will impact - // Call remote admin to update the service - unexportService(reference); - exportService(reference); - break; - } - } - */ - public Object addingService(ServiceReference reference) { exportService(reference); return reference.getBundle().getBundleContext().getService(reference); @@ -191,39 +163,13 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList * @see org.osgi.framework.hooks.service.ListenerHook#added(java.util.Collection) */ public void added(Collection listeners) { - boolean changed = false; - String[] filters = null; try { - synchronized (serviceListeners) { - Collection listenerInfos = (Collection)listeners; - for (ListenerInfo l : listenerInfos) { - if (l.getBundleContext().getBundle().getBundleId() == 0L || l.getBundleContext() == context) { - // Ignore system and tuscany bundle - continue; - } - if (!l.isRemoved()) { - String key = l.getFilter(); - if (key == null) { - // key = ""; - // FIXME: It should always match, let's ignore it for now - logger.warning("Service listner without a filter is skipped: " + l); - continue; - } - Collection infos = serviceListeners.get(key); - if (infos == null) { - infos = new HashSet(); - serviceListeners.put(key, infos); - } - infos.add(l); - changed = true; - } + synchronized (endpointMatcher) { + Collection oldFilters = endpointMatcher.getFilters(); + Collection newFilters = endpointMatcher.added(listeners); + if (!OSGiHelper.getAddedItems(oldFilters, newFilters).isEmpty()) { + updateEndpointListenerScope(newFilters); } - if (changed) { - filters = getFilters(); - } - } - if (changed) { - updateEndpointListenerScope(filters); } } catch (Throwable e) { logger.log(Level.SEVERE, e.getMessage(), e); @@ -238,85 +184,24 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList } } - private void updateEndpointListenerScope(String[] filters) { + private void updateEndpointListenerScope(Collection filters) { Dictionary props = new Hashtable(); props.put(ENDPOINT_LISTENER_SCOPE, filters); endpointListener.setProperties(props); } - private String[] getFilters() { - Set filterSet = serviceListeners.keySet(); - String[] filters = filterSet.toArray(new String[filterSet.size()]); - return filters; - } - - private CollectionMap, ListenerInfo> findServiceListeners(EndpointDescription endpointDescription, - String matchedFilter) { - Collection listeners = null; - synchronized (serviceListeners) { - - // First find all the listeners that have the matching filter - listeners = serviceListeners.get(matchedFilter); - if (listeners == null) { - return null; - } - listeners = new ArrayList(listeners); - } - - // Try to partition the listeners by the interface classes - List interfaceNames = endpointDescription.getInterfaces(); - CollectionMap, ListenerInfo> interfaceToListeners = new CollectionMap, ListenerInfo>(); - for (String i : interfaceNames) { - for (Iterator it = listeners.iterator(); it.hasNext();) { - try { - ListenerInfo listener = it.next(); - if (listener.isRemoved()) { - it.remove(); - continue; - } - try { - // The classloading can be synchronzed against the serviceListeners - Class interfaceClass = listener.getBundleContext().getBundle().loadClass(i); - interfaceToListeners.putValue(interfaceClass, listener); - } catch (IllegalStateException e) { - logger.log(Level.WARNING, e.getMessage(), e); - // Ignore the exception - } - } catch (ClassNotFoundException e) { - // Ignore the listener as it cannot load the interface class - } - } - } - return interfaceToListeners; - } - /** * @see org.osgi.framework.hooks.service.ListenerHook#removed(java.util.Collection) */ public void removed(Collection listeners) { - boolean changed = false; - String[] filters = null; try { - synchronized (serviceListeners) { - Collection listenerInfos = (Collection)listeners; - for (ListenerInfo l : listenerInfos) { - if (registration != null && l.getBundleContext() != context) { - String key = l.getFilter(); - if (key == null) { - continue; - } - if (serviceListeners.removeValue(key, l)) { - changed = true; - } - } - } - if (changed) { - filters = getFilters(); + synchronized (endpointMatcher) { + Collection oldFilters = endpointMatcher.getFilters(); + Collection newFilters = endpointMatcher.removed(listeners); + if (!OSGiHelper.getRemovedItems(oldFilters, newFilters).isEmpty()) { + updateEndpointListenerScope(newFilters); } } - if (changed) { - updateEndpointListenerScope(filters); - } } catch (Throwable e) { logger.log(Level.SEVERE, e.getMessage(), e); if (e instanceof Error) { @@ -353,24 +238,27 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList * java.lang.String) */ public void endpointAdded(EndpointDescription endpoint, String matchedFilter) { - importService(endpoint, matchedFilter); + endpointMatcher.added(endpoint, matchedFilter); + // importService(endpoint, matchedFilter); } /** * @see org.osgi.remoteserviceadmin.EndpointListener#removeEndpoint(org.osgi.service.remoteserviceadmin.EndpointDescription) */ public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) { - unimportService(endpoint); + endpointMatcher.removed(endpoint, matchedFilter); + // unimportService(endpoint); } private void importService(EndpointDescription endpoint, String matchedFilter) { Object[] admins = remoteAdmins.getServices(); if (admins == null) { - logger.warning("No RemoteAdmin services are available."); + logger.warning("No Remote Service Admin services are available."); return; } - CollectionMap, ListenerInfo> interfaceToListeners = findServiceListeners(endpoint, matchedFilter); + CollectionMap, ListenerInfo> interfaceToListeners = + endpointMatcher.groupListeners(endpoint, matchedFilter); for (Map.Entry, Collection> e : interfaceToListeners.entrySet()) { Class interfaceClass = e.getKey(); Collection listeners = e.getValue(); @@ -401,16 +289,17 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList RemoteServiceAdmin remoteAdmin = (RemoteServiceAdmin)ra; ImportRegistration importRegistration = remoteAdmin.importService(description); if (importRegistration != null) { - importedServices.putValue(endpoint, importRegistration); + importedServices.putValue(new ImportKey(description, listener), importRegistration); } } } } } - private void unimportService(EndpointDescription endpoint) { + private void unimportService(EndpointDescription endpoint, ListenerInfo listenerInfo) { // Call remote admin to unimport the service - Collection importRegistrations = importedServices.get(endpoint); + Collection importRegistrations = + importedServices.get(new ImportKey(endpoint, listenerInfo)); if (importRegistrations != null) { for (Iterator i = importRegistrations.iterator(); i.hasNext();) { ImportRegistration imported = i.next(); @@ -435,8 +324,76 @@ public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminList remoteAdmins.close(); remoteAdmins = null; } - synchronized (serviceListeners) { - serviceListeners.clear(); + if (endpointMatcher != null) { + endpointMatcher.clear(); + } + } + + private class ImportTask implements Runnable { + public void run() { + while (registration != null) { + BlockingQueue queue = endpointMatcher.getImportQueue(); + ImportAction action = null; + try { + action = queue.poll(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Ignore + } + if (action != null) { + if (action.type == ImportAction.Type.Add) { + importService(action.endpointDescription, action.listenerInfo.getFilter()); + } else if (action.type == ImportAction.Type.Remove) { + unimportService(action.endpointDescription, action.listenerInfo); + } + } + } + } + } + + private static class ImportKey { + private EndpointDescription endpointDescription; + + /** + * @param endpointDescription + * @param listenerInfo + */ + private ImportKey(EndpointDescription endpointDescription, ListenerInfo listenerInfo) { + super(); + this.endpointDescription = endpointDescription; + this.listenerInfo = listenerInfo; + } + + private ListenerInfo listenerInfo; + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((endpointDescription == null) ? 0 : endpointDescription.hashCode()); + result = prime * result + ((listenerInfo == null) ? 0 : listenerInfo.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ImportKey other = (ImportKey)obj; + if (endpointDescription == null) { + if (other.endpointDescription != null) + return false; + } else if (!endpointDescription.equals(other.endpointDescription)) + return false; + if (listenerInfo == null) { + if (other.listenerInfo != null) + return false; + } else if (!listenerInfo.equals(other.listenerInfo)) + return false; + return true; } } diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java index dbdbbaead8..e9260cef53 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java @@ -151,7 +151,6 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi EndpointListener listener = (EndpointListener)service; Collection oldFilters = null; Collection newFilters = null; - Collection endpoints = null; synchronized (this) { if (logger.isLoggable(Level.FINE)) { logger.fine("updating listener: " + listener); @@ -178,6 +177,16 @@ public abstract class AbstractDiscoveryService implements Discovery, LifeCycleLi triggerCallbacks(listener, next, sd, ADDED); } } + // Find removed filters + deltaInterest = getDelta(newInterest, oldInterest); + + i = deltaInterest.iterator(); + while (i.hasNext()) { + String next = i.next(); + for (EndpointDescription sd : endpointDescriptions.keySet()) { + triggerCallbacks(listener, next, sd, REMOVED); + } + } } private Collection getDelta(Collection oldInterest, Collection newInterest) { -- cgit v1.2.3