summaryrefslogtreecommitdiffstats
path: root/tags/java/sca/2.0-M4-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'tags/java/sca/2.0-M4-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java')
-rw-r--r--tags/java/sca/2.0-M4-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java410
1 files changed, 410 insertions, 0 deletions
diff --git a/tags/java/sca/2.0-M4-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java b/tags/java/sca/2.0-M4-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java
new file mode 100644
index 0000000000..1967fa6e4a
--- /dev/null
+++ b/tags/java/sca/2.0-M4-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/TopologyManagerImpl.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.osgi.remoteserviceadmin.impl;
+
+import static org.apache.tuscany.sca.implementation.osgi.OSGiProperty.SERVICE_EXPORTED_INTERFACES;
+import static org.apache.tuscany.sca.osgi.remoteserviceadmin.RemoteConstants.SERVICE_EXPORTED_CONFIGS;
+import static org.apache.tuscany.sca.osgi.remoteserviceadmin.RemoteConstants.SERVICE_IMPORTED;
+
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.tuscany.sca.common.java.collection.CollectionMap;
+import org.apache.tuscany.sca.core.LifeCycleListener;
+import org.apache.tuscany.sca.osgi.remoteserviceadmin.EndpointDescription;
+import org.apache.tuscany.sca.osgi.remoteserviceadmin.EndpointListener;
+import org.apache.tuscany.sca.osgi.remoteserviceadmin.ExportRegistration;
+import org.apache.tuscany.sca.osgi.remoteserviceadmin.ImportRegistration;
+import org.apache.tuscany.sca.osgi.remoteserviceadmin.RemoteServiceAdmin;
+import org.apache.tuscany.sca.osgi.remoteserviceadmin.RemoteServiceAdminEvent;
+import org.apache.tuscany.sca.osgi.remoteserviceadmin.RemoteServiceAdminListener;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.Filter;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.framework.hooks.service.ListenerHook;
+import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
+
+/**
+ * Implementation of Remote Controller
+ */
+public class TopologyManagerImpl implements ListenerHook, RemoteServiceAdminListener, EndpointListener,
+ ServiceTrackerCustomizer, LifeCycleListener /*, EventHook */{
+ private final static Logger logger = Logger.getLogger(TopologyManagerImpl.class.getName());
+ public final static String ENDPOINT_LOCAL = "service.local";
+
+ private BundleContext context;
+ private ServiceTracker remoteAdmins;
+
+ private ServiceRegistration registration;
+ private ServiceRegistration endpointListener;
+
+ private ServiceTracker remotableServices;
+
+ // Service listeners keyed by the filter
+ private CollectionMap<String, ListenerInfo> serviceListeners = new CollectionMap<String, ListenerInfo>();
+
+ private CollectionMap<ServiceReference, ExportRegistration> exportedServices =
+ new CollectionMap<ServiceReference, ExportRegistration>();
+ private CollectionMap<EndpointDescription, ImportRegistration> importedServices =
+ new CollectionMap<EndpointDescription, ImportRegistration>();
+
+ private Filter remotableServiceFilter;
+
+ public TopologyManagerImpl(BundleContext context) {
+ this.context = context;
+ }
+
+ public void start() {
+ String filter =
+ "(& (!(" + SERVICE_IMPORTED
+ + "=*)) ("
+ + SERVICE_EXPORTED_INTERFACES
+ + "=*) ("
+ + SERVICE_EXPORTED_CONFIGS
+ + "=org.osgi.sca) )";
+ try {
+ remotableServiceFilter = context.createFilter(filter);
+ } catch (InvalidSyntaxException e) {
+ // Ignore
+ }
+
+ endpointListener = context.registerService(EndpointListener.class.getName(), this, null);
+ remoteAdmins = new ServiceTracker(this.context, RemoteServiceAdmin.class.getName(), null);
+ remoteAdmins.open();
+
+ // DO NOT register EventHook.class.getName() as it cannot report existing services
+ String interfaceNames[] =
+ new String[] {ListenerHook.class.getName(), RemoteServiceAdminListener.class.getName()};
+ // The registration will trigger the added() method before registration is assigned
+ registration = context.registerService(interfaceNames, this, null);
+
+ remotableServices = new ServiceTracker(context, remotableServiceFilter, this);
+ remotableServices.open(true);
+
+ }
+
+ /**
+ * @see org.osgi.framework.hooks.service.EventHook#event(org.osgi.framework.ServiceEvent,
+ * java.util.Collection)
+ */
+ /*
+ public void event(ServiceEvent event, Collection contexts) {
+ ServiceReference reference = event.getServiceReference();
+ if (!remotableServiceFilter.match(reference)) {
+ // Only export remotable services that are for SCA
+ return;
+ }
+ switch (event.getType()) {
+ case ServiceEvent.REGISTERED:
+ exportService(reference);
+ break;
+ case ServiceEvent.UNREGISTERING:
+ unexportService(reference);
+ break;
+ case ServiceEvent.MODIFIED:
+ case ServiceEvent.MODIFIED_ENDMATCH:
+ // First check if the property changes will impact
+ // Call remote admin to update the service
+ unexportService(reference);
+ exportService(reference);
+ break;
+ }
+ }
+ */
+
+ public Object addingService(ServiceReference reference) {
+ exportService(reference);
+ return reference.getBundle().getBundleContext().getService(reference);
+ }
+
+ public void modifiedService(ServiceReference reference, Object service) {
+ unexportService(reference);
+ exportService(reference);
+ }
+
+ public void removedService(ServiceReference reference, Object service) {
+ unexportService(reference);
+ }
+
+ private void unexportService(ServiceReference reference) {
+ // Call remote admin to unexport the service
+ Collection<ExportRegistration> exportRegistrations = exportedServices.get(reference);
+ if (exportRegistrations != null) {
+ for (Iterator<ExportRegistration> i = exportRegistrations.iterator(); i.hasNext();) {
+ ExportRegistration exported = i.next();
+ exported.close();
+ i.remove();
+ }
+ }
+ }
+
+ private void exportService(ServiceReference reference) {
+ // Call remote admin to export the service
+ Object[] admins = remoteAdmins.getServices();
+ if (admins == null) {
+ // Ignore
+ logger.warning("No RemoteAdmin services are available.");
+ } else {
+ for (Object ra : admins) {
+ RemoteServiceAdmin remoteAdmin = (RemoteServiceAdmin)ra;
+ List<ExportRegistration> exportRegistrations = remoteAdmin.exportService(reference, null);
+ if (exportRegistrations != null && !exportRegistrations.isEmpty()) {
+ exportedServices.putValues(reference, exportRegistrations);
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.osgi.framework.hooks.service.ListenerHook#added(java.util.Collection)
+ */
+ public void added(Collection listeners) {
+ synchronized (serviceListeners) {
+ try {
+ Collection<ListenerInfo> listenerInfos = (Collection<ListenerInfo>)listeners;
+ boolean changed = false;
+ for (ListenerInfo l : listenerInfos) {
+ if (!l.isRemoved() && l.getBundleContext() != context) {
+ String key = l.getFilter();
+ if (key == null) {
+ // key = "";
+ // FIXME: It should always match, let's ignore it for now
+ logger.warning("Service listner without a filter is skipped: " + l);
+ continue;
+ }
+ Collection<ListenerInfo> infos = serviceListeners.get(key);
+ if (infos == null) {
+ infos = new HashSet<ListenerInfo>();
+ serviceListeners.put(key, infos);
+ }
+ infos.add(l);
+ changed = true;
+ }
+ }
+ if (changed) {
+ updateEndpointListenerScope();
+ }
+ } catch (Throwable e) {
+ logger.log(Level.SEVERE, e.getMessage(), e);
+ if (e instanceof Error) {
+ throw (Error)e;
+ } else if (e instanceof RuntimeException) {
+ throw (RuntimeException)e;
+ } else {
+ // Should not happen
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ private void updateEndpointListenerScope() {
+ Set<String> filterSet = serviceListeners.keySet();
+
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(ENDPOINT_LISTENER_SCOPE, filterSet.toArray(new String[filterSet.size()]));
+ endpointListener.setProperties(props);
+ }
+
+ private CollectionMap<Class<?>, ListenerInfo> findServiceListeners(EndpointDescription endpointDescription,
+ String matchedFilter) {
+ synchronized (serviceListeners) {
+
+ // First find all the listeners that have the matching filter
+ Collection<ListenerInfo> listeners = serviceListeners.get(matchedFilter);
+ if (listeners == null) {
+ return null;
+ }
+
+ // Try to partition the listeners by the interface classes
+ List<String> interfaceNames = endpointDescription.getInterfaces();
+ CollectionMap<Class<?>, ListenerInfo> interfaceToListeners = new CollectionMap<Class<?>, ListenerInfo>();
+ for (String i : interfaceNames) {
+ for (Iterator<ListenerInfo> it = listeners.iterator(); it.hasNext();) {
+ try {
+ ListenerInfo listener = it.next();
+ if (listener.isRemoved()) {
+ it.remove();
+ continue;
+ }
+ try {
+ Class<?> interfaceClass = listener.getBundleContext().getBundle().loadClass(i);
+ interfaceToListeners.putValue(interfaceClass, listener);
+ } catch (IllegalStateException e) {
+ logger.log(Level.WARNING, e.getMessage(), e);
+ // Ignore the exception
+ }
+ } catch (ClassNotFoundException e) {
+ // Ignore the listener as it cannot load the interface class
+ }
+ }
+ }
+ return interfaceToListeners;
+ }
+ }
+
+ /**
+ * @see org.osgi.framework.hooks.service.ListenerHook#removed(java.util.Collection)
+ */
+ public void removed(Collection listeners) {
+ synchronized (serviceListeners) {
+ try {
+ Collection<ListenerInfo> listenerInfos = (Collection<ListenerInfo>)listeners;
+ boolean changed = false;
+ for (ListenerInfo l : listenerInfos) {
+ if (registration != null && l.getBundleContext() != context) {
+ String key = l.getFilter();
+ if (key == null) {
+ continue;
+ }
+ if (serviceListeners.removeValue(key, l)) {
+ changed = true;
+ }
+ }
+ }
+ if (changed) {
+ updateEndpointListenerScope();
+ }
+ } catch (Throwable e) {
+ logger.log(Level.SEVERE, e.getMessage(), e);
+ if (e instanceof Error) {
+ throw (Error)e;
+ } else if (e instanceof RuntimeException) {
+ throw (RuntimeException)e;
+ } else {
+ // Should not happen
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.tuscany.sca.osgi.service.remoteadmin.RemoteAdminListener#remoteAdminEvent(org.apache.tuscany.sca.osgi.service.remoteadmin.RemoteAdminEvent)
+ */
+ public void remoteAdminEvent(RemoteServiceAdminEvent event) {
+ switch (event.getType()) {
+ case RemoteServiceAdminEvent.EXPORT_ERROR:
+ case RemoteServiceAdminEvent.EXPORT_REGISTRATION:
+ case RemoteServiceAdminEvent.EXPORT_UNREGISTRATION:
+ case RemoteServiceAdminEvent.EXPORT_WARNING:
+ break;
+ case RemoteServiceAdminEvent.IMPORT_ERROR:
+ case RemoteServiceAdminEvent.IMPORT_REGISTRATION:
+ case RemoteServiceAdminEvent.IMPORT_UNREGISTRATION:
+ case RemoteServiceAdminEvent.IMPORT_WARNING:
+ break;
+ }
+ }
+
+ /**
+ * @see org.apache.tuscany.sca.osgi.remoteserviceadmin.EndpointListener#addEndpoint(org.apache.tuscany.sca.osgi.remoteserviceadmin.EndpointDescription,
+ * java.lang.String)
+ */
+ public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+ importService(endpoint, matchedFilter);
+ }
+
+ /**
+ * @see org.apache.tuscany.sca.osgi.remoteserviceadmin.EndpointListener#removeEndpoint(org.apache.tuscany.sca.osgi.remoteserviceadmin.EndpointDescription)
+ */
+ public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+ unimportService(endpoint);
+ }
+
+ private void importService(EndpointDescription endpoint, String matchedFilter) {
+ Object[] admins = remoteAdmins.getServices();
+ if (admins == null) {
+ logger.warning("No RemoteAdmin services are available.");
+ return;
+ }
+
+ CollectionMap<Class<?>, ListenerInfo> interfaceToListeners = findServiceListeners(endpoint, matchedFilter);
+ for (Map.Entry<Class<?>, Collection<ListenerInfo>> e : interfaceToListeners.entrySet()) {
+ Class<?> interfaceClass = e.getKey();
+ Collection<ListenerInfo> listeners = e.getValue();
+ // Get a listener
+ ListenerInfo listener = listeners.iterator().next();
+ Bundle bundle = listener.getBundleContext().getBundle();
+
+ Map<String, Object> props = new HashMap<String, Object>(endpoint.getProperties());
+ props.put(Bundle.class.getName(), bundle);
+ props.put(Constants.OBJECTCLASS, new String[] {interfaceClass.getName()});
+ EndpointDescription description = new EndpointDescription(props);
+
+ if (admins != null) {
+ for (Object ra : admins) {
+ RemoteServiceAdmin remoteAdmin = (RemoteServiceAdmin)ra;
+ ImportRegistration importRegistration = remoteAdmin.importService(description);
+ if (importRegistration != null) {
+ importedServices.putValue(endpoint, importRegistration);
+ }
+ }
+ }
+ }
+ }
+
+ private void unimportService(EndpointDescription endpoint) {
+ // Call remote admin to unimport the service
+ Collection<ImportRegistration> importRegistrations = importedServices.get(endpoint);
+ if (importRegistrations != null) {
+ for (Iterator<ImportRegistration> i = importRegistrations.iterator(); i.hasNext();) {
+ ImportRegistration imported = i.next();
+ imported.close();
+ i.remove();
+ }
+ }
+ }
+
+ public void stop() {
+ remotableServices.close();
+
+ if (registration != null) {
+ try {
+ registration.unregister();
+ } catch (IllegalStateException e) {
+ // The service has been unregistered, ignore it
+ }
+ registration = null;
+ }
+ if (remoteAdmins != null) {
+ remoteAdmins.close();
+ remoteAdmins = null;
+ }
+ synchronized (serviceListeners) {
+ serviceListeners.clear();
+ }
+ }
+
+}