summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service
diff options
context:
space:
mode:
Diffstat (limited to 'sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service')
-rw-r--r--sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java265
-rw-r--r--sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/Discovery.java66
-rw-r--r--sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DiscoveryActivator.java57
-rw-r--r--sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java140
-rw-r--r--sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/LocalDiscoveryService.java262
5 files changed, 790 insertions, 0 deletions
diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java b/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java
new file mode 100644
index 0000000000..fd57dbded8
--- /dev/null
+++ b/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/AbstractDiscoveryService.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.osgi.service.discovery.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.LifeCycleListener;
+import org.apache.tuscany.sca.node.NodeFactory;
+import org.apache.tuscany.sca.node.impl.NodeFactoryImpl;
+import org.apache.tuscany.sca.osgi.remoteserviceadmin.impl.OSGiHelper;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.Filter;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.osgi.util.tracker.ServiceTracker;
+
+/**
+ *
+ */
+public abstract class AbstractDiscoveryService implements Discovery, LifeCycleListener {
+ protected final static int ADDED = 0x1;
+ protected final static int REMOVED = 0x2;
+ protected final static int MODIFIED = 0x4;
+
+ protected final static Logger logger = Logger.getLogger(AbstractDiscoveryService.class.getName());
+
+ protected BundleContext context;
+ protected ExtensionPointRegistry registry;
+ // private WorkScheduler workScheduler;
+
+ private Map<EndpointListener, Collection<String>> listenersToFilters =
+ new ConcurrentHashMap<EndpointListener, Collection<String>>();
+
+ protected Map<EndpointDescription, Bundle> endpointDescriptions =
+ new ConcurrentHashMap<EndpointDescription, Bundle>();
+ private ServiceTracker trackerTracker;
+
+ public AbstractDiscoveryService(BundleContext context) {
+ super();
+ this.context = context;
+ }
+
+ public void start() {
+ getExtensionPointRegistry();
+ // UtilityExtensionPoint utilityExtensionPoint = registry.getExtensionPoint(UtilityExtensionPoint.class);
+ // this.workScheduler = utilityExtensionPoint.getUtility(WorkScheduler.class);
+
+ // track the registration of EndpointListener
+ trackerTracker = new ServiceTracker(this.context, EndpointListener.class.getName(), null) {
+ public Object addingService(ServiceReference reference) {
+ Object result = super.addingService(reference);
+ cacheTracker(reference, result);
+ return result;
+ }
+
+ public void modifiedService(ServiceReference reference, Object service) {
+ super.modifiedService(reference, service);
+ updateTracker(reference, service);
+ }
+
+ public void removedService(ServiceReference reference, Object service) {
+ super.removedService(reference, service);
+ clearTracker(service);
+ }
+ };
+
+ trackerTracker.open();
+ }
+
+ public void stop() {
+ trackerTracker.close();
+ }
+
+ protected ExtensionPointRegistry getExtensionPointRegistry() {
+ NodeFactoryImpl factory = (NodeFactoryImpl)NodeFactory.getInstance();
+ factory.init();
+ ServiceTracker tracker = new ServiceTracker(context, ExtensionPointRegistry.class.getName(), null);
+ tracker.open();
+ // tracker.waitForService(1000);
+ registry = (ExtensionPointRegistry)tracker.getService();
+ tracker.close();
+ return registry;
+ }
+
+ protected Dictionary<String, Object> getProperties() {
+ Dictionary headers = context.getBundle().getHeaders();
+ Hashtable<String, Object> props = new Hashtable<String, Object>();
+ props.put(PRODUCT_NAME, "Apache Tuscany SCA");
+ props.put(PRODUCT_VERSION, headers.get(Constants.BUNDLE_VERSION));
+ props.put(VENDOR_NAME, headers.get(Constants.BUNDLE_VENDOR));
+ // props.put(SUPPORTED_PROTOCOLS, new String[] {"local", "org.osgi.sca"});
+ return props;
+ }
+
+ private void cacheTracker(ServiceReference reference, Object service) {
+ if (service instanceof EndpointListener) {
+ EndpointListener listener = (EndpointListener)service;
+ Collection<String> filters = null;
+ synchronized (this) {
+ filters = addTracker(reference, listener, EndpointListener.ENDPOINT_LISTENER_SCOPE);
+ // Take a snapshot of the endpoints
+ triggerCallbacks(null, filters, listener);
+ }
+ }
+ }
+
+ private void clearTracker(Object service) {
+ if (service instanceof EndpointListener) {
+ synchronized (this) {
+ removeTracker((EndpointListener)service);
+ }
+ }
+ }
+
+ private void updateTracker(ServiceReference reference, Object service) {
+ if (service instanceof EndpointListener) {
+ EndpointListener listener = (EndpointListener)service;
+ Collection<String> oldFilters = null;
+ Collection<String> newFilters = null;
+ synchronized (this) {
+ if (logger.isLoggable(Level.FINE)) {
+ logger.fine("updating listener: " + listener);
+ }
+ oldFilters = removeTracker(listener);
+ newFilters = addTracker(reference, listener, EndpointListener.ENDPOINT_LISTENER_SCOPE);
+ triggerCallbacks(oldFilters, newFilters, listener);
+ }
+ }
+ }
+
+ private void triggerCallbacks(Collection<String> oldInterest,
+ Collection<String> newInterest,
+ EndpointListener listener) {
+ // compute delta between old & new interfaces/filters and
+ // trigger callbacks for any entries in servicesInfo that
+ // match any *additional* interface/filters
+ Collection<String> deltaInterest = getDelta(oldInterest, newInterest);
+
+ Iterator<String> i = deltaInterest.iterator();
+ while (i.hasNext()) {
+ String next = i.next();
+ for (EndpointDescription sd : endpointDescriptions.keySet()) {
+ triggerCallbacks(listener, next, sd, ADDED);
+ }
+ }
+ // Find removed filters
+ deltaInterest = getDelta(newInterest, oldInterest);
+
+ i = deltaInterest.iterator();
+ while (i.hasNext()) {
+ String next = i.next();
+ for (EndpointDescription sd : endpointDescriptions.keySet()) {
+ triggerCallbacks(listener, next, sd, REMOVED);
+ }
+ }
+ }
+
+ private Collection<String> getDelta(Collection<String> oldInterest, Collection<String> newInterest) {
+ if (newInterest == null) {
+ newInterest = Collections.emptySet();
+ }
+
+ Collection<String> deltaInterest = new ArrayList<String>(newInterest);
+ if (oldInterest == null) {
+ oldInterest = Collections.emptySet();
+ }
+ deltaInterest.removeAll(oldInterest);
+ return deltaInterest;
+ }
+
+ /**
+ * Notify the endpoint listener
+ * @param listener
+ * @param matchedFilter
+ * @param endpoint
+ * @param type
+ */
+ private static void notify(EndpointListener listener, String matchedFilter, EndpointDescription endpoint, int type) {
+ switch (type) {
+ case ADDED:
+ listener.endpointAdded(endpoint, matchedFilter);
+ break;
+ case REMOVED:
+ listener.endpointRemoved(endpoint, matchedFilter);
+ break;
+ case MODIFIED:
+ listener.endpointRemoved(endpoint, matchedFilter);
+ listener.endpointAdded(endpoint, matchedFilter);
+ break;
+ }
+ }
+
+ private void triggerCallbacks(EndpointListener listener,
+ String matchedFilter,
+ EndpointDescription endpoint,
+ int type) {
+ // workScheduler.scheduleWork(new Notifier(listener, matchedFilter, endpoint, type));
+ notify(listener, matchedFilter, endpoint, type);
+ }
+
+ private boolean filterMatches(String filterValue, EndpointDescription sd) {
+ Filter filter = OSGiHelper.createFilter(context, filterValue);
+ Hashtable<String, Object> props = new Hashtable<String, Object>(sd.getProperties());
+ // Add two faked properties to make the filter match
+ props.put(Constants.OBJECTCLASS, sd.getInterfaces());
+ props.put(RemoteConstants.SERVICE_IMPORTED, "true");
+ return filter != null ? filter.match(props) : false;
+ }
+
+ private Collection<String> removeTracker(EndpointListener listener) {
+ return listenersToFilters.remove(listener);
+ }
+
+ private Collection<String> addTracker(ServiceReference reference, EndpointListener listener, String property) {
+ Collection<String> collection = OSGiHelper.getStringCollection(reference, property);
+ if (collection != null && !collection.isEmpty()) {
+ listenersToFilters.put(listener, new ArrayList<String>(collection));
+ }
+ return collection;
+ }
+
+ protected void endpointChanged(EndpointDescription sd, int type) {
+ synchronized (this) {
+ for (Map.Entry<EndpointListener, Collection<String>> entry : listenersToFilters.entrySet()) {
+ for (String filter : entry.getValue()) {
+ if (filterMatches(filter, sd)) {
+ triggerCallbacks(entry.getKey(), filter, sd, type);
+ }
+ }
+ }
+ }
+ }
+
+}
diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/Discovery.java b/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/Discovery.java
new file mode 100644
index 0000000000..3ad1e53fee
--- /dev/null
+++ b/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/Discovery.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.osgi.service.discovery.impl;
+
+/**
+ * Every Discovery Provider registers a service implementing this interface.
+ * This service is registered with extra properties identified at the beginning
+ * of this interface to denote the name of the product providing Discovery
+ * functionality, its version, vendor, used protocols etc..
+ * <p>
+ * Discovery allows to publish services exposed for remote access as well as to
+ * search for remote services.
+ * <p>
+ * Discovery service implementations usually rely on some discovery protocols or
+ * other information distribution means.
+ *
+ * @ThreadSafe
+ */
+public interface Discovery {
+
+ /**
+ * ServiceRegistration property for the name of the Discovery product.
+ * <p>
+ * Value of this property is of type <code>String</code>.
+ */
+ String PRODUCT_NAME = "osgi.remote.discovery.product";
+
+ /**
+ * ServiceRegistration property for the version of the Discovery product.
+ * <p>
+ * Value of this property is of type <code>String</code>.
+ */
+ String PRODUCT_VERSION = "osgi.remote.discovery.product.version";
+
+ /**
+ * ServiceRegistration property for the Discovery product vendor name.
+ * <p>
+ * Value of this property is of type <code>String</code>.
+ */
+ String VENDOR_NAME = "osgi.remote.discovery.vendor";
+
+ /**
+ * ServiceRegistration property that lists the discovery protocols used by
+ * this Discovery service.
+ * <p>
+ * Value of this property is of type
+ * <code>Collection (&lt;? extends String&gt;)</code>.
+ */
+ String SUPPORTED_PROTOCOLS = "osgi.remote.discovery.supported_protocols";
+}
diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DiscoveryActivator.java b/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DiscoveryActivator.java
new file mode 100644
index 0000000000..32bd10b77b
--- /dev/null
+++ b/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DiscoveryActivator.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.osgi.service.discovery.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+public class DiscoveryActivator implements BundleActivator {
+ private List<AbstractDiscoveryService> discoveryServices = new ArrayList<AbstractDiscoveryService>();
+ private List<ServiceRegistration> discoveryServiceRegistrations = new ArrayList<ServiceRegistration>();
+
+ public void start(BundleContext context) {
+ discoveryServices.add(new LocalDiscoveryService(context));
+
+ discoveryServices.add(new DomainDiscoveryService(context));
+
+ for (AbstractDiscoveryService service : discoveryServices) {
+ service.start();
+ ServiceRegistration registration =
+ context.registerService(Discovery.class.getName(), service, service.getProperties());
+ discoveryServiceRegistrations.add(registration);
+ }
+ }
+
+ public void stop(BundleContext context) {
+ for (ServiceRegistration registration : discoveryServiceRegistrations) {
+ try {
+ registration.unregister();
+ } catch (IllegalStateException e) {
+ // The service has been unregistered, ignore it
+ }
+ }
+ for (AbstractDiscoveryService service : discoveryServices) {
+ service.stop();
+ }
+ }
+}
diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java b/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java
new file mode 100644
index 0000000000..f8b4c3be8c
--- /dev/null
+++ b/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.osgi.service.discovery.impl;
+
+import static org.apache.tuscany.sca.osgi.remoteserviceadmin.impl.EndpointHelper.createEndpointDescription;
+
+import java.util.Dictionary;
+
+import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.assembly.Implementation;
+import org.apache.tuscany.sca.core.LifeCycleListener;
+import org.apache.tuscany.sca.implementation.osgi.OSGiImplementation;
+import org.apache.tuscany.sca.node.configuration.NodeConfiguration;
+import org.apache.tuscany.sca.runtime.DomainRegistryFactory;
+import org.apache.tuscany.sca.runtime.EndpointListener;
+import org.apache.tuscany.sca.runtime.DomainRegistry;
+import org.apache.tuscany.sca.runtime.ExtensibleDomainRegistryFactory;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+
+/**
+ * Discovery service based on the distributed SCA domain
+ */
+public class DomainDiscoveryService extends AbstractDiscoveryService implements EndpointListener {
+ private DomainRegistryFactory domainRegistryFactory;
+ private DomainRegistry domainRegistry;
+
+ public DomainDiscoveryService(BundleContext context) {
+ super(context);
+ }
+
+ public void start() {
+ super.start();
+ this.domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(registry);
+ domainRegistryFactory.addListener(this);
+
+ // [rfeng] Starting of the endpoint registry takes a long time and it leaves the bundle
+ // state to be starting. When the registry is started, remote endpoints come in and that
+ // triggers the classloading from this bundle.
+ Thread thread = new Thread() {
+ public void run() {
+ startEndpointRegistry();
+ }
+ };
+ thread.start();
+ }
+
+ private synchronized void startEndpointRegistry() {
+ // The following code forced the start() of the domain registry in absense of services
+ String domainRegistryURI = context.getProperty("org.osgi.sca.domain.registry");
+ if (domainRegistryURI == null) {
+ domainRegistryURI = NodeConfiguration.DEFAULT_DOMAIN_REGISTRY_URI;
+ }
+ String domainURI = context.getProperty("org.osgi.sca.domain.uri");
+ if (domainURI == null) {
+ domainURI = NodeConfiguration.DEFAULT_DOMAIN_URI;
+ }
+ if (domainRegistry != null) {
+ domainRegistry = domainRegistryFactory.getEndpointRegistry(domainRegistryURI, domainURI);
+ }
+ }
+
+ public void endpointAdded(Endpoint endpoint) {
+ Implementation impl = endpoint.getComponent().getImplementation();
+
+ /*
+ if (!(impl instanceof OSGiImplementation)) {
+ return;
+ }
+ */
+
+ BundleContext bundleContext = null;
+ // Remote endpoint doesn't have a bundle
+ if (!endpoint.isRemote()) {
+ OSGiImplementation osgiImpl = (OSGiImplementation)impl;
+ Bundle bundle = osgiImpl.getBundle();
+ bundleContext = bundle != null ? bundle.getBundleContext() : null;
+ }
+
+ // Notify the endpoint listeners
+ EndpointDescription description = createEndpointDescription(bundleContext, endpoint);
+ // Set the owning bundle to runtime bundle to avoid NPE
+ synchronized (this) {
+ endpointDescriptions.put(description, context.getBundle());
+ endpointChanged(description, ADDED);
+ }
+ }
+
+ public void endpointRemoved(Endpoint endpoint) {
+ EndpointDescription description = createEndpointDescription(context, endpoint);
+ synchronized (this) {
+ endpointDescriptions.remove(description);
+ endpointChanged(description, REMOVED);
+ }
+ }
+
+ public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) {
+ // FIXME: This is a quick and dirty way for the update
+ endpointRemoved(oldEndpoint);
+ endpointAdded(newEndpoint);
+ }
+
+ public void stop() {
+ if (domainRegistryFactory != null) {
+ domainRegistryFactory.removeListener(this);
+ if (domainRegistry instanceof LifeCycleListener) {
+ ((LifeCycleListener)domainRegistry).stop();
+ }
+ domainRegistryFactory = null;
+ domainRegistry = null;
+ super.stop();
+ }
+ }
+
+ @Override
+ protected Dictionary<String, Object> getProperties() {
+ Dictionary<String, Object> props = super.getProperties();
+ props.put(SUPPORTED_PROTOCOLS, new String[] {"org.osgi.sca"});
+ return props;
+ }
+
+}
diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/LocalDiscoveryService.java b/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/LocalDiscoveryService.java
new file mode 100644
index 0000000000..dc9ae096dd
--- /dev/null
+++ b/sca-java-2.x/tags/2.0.1-RC1/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/LocalDiscoveryService.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.osgi.service.discovery.impl;
+
+import static org.apache.tuscany.sca.osgi.remoteserviceadmin.impl.OSGiHelper.getConfiguration;
+import static org.osgi.service.remoteserviceadmin.RemoteConstants.ENDPOINT_FRAMEWORK_UUID;
+import static org.osgi.service.remoteserviceadmin.RemoteConstants.ENDPOINT_ID;
+import static org.osgi.service.remoteserviceadmin.RemoteConstants.ENDPOINT_SERVICE_ID;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import java.util.Map.Entry;
+import java.util.logging.Level;
+
+import javax.xml.namespace.QName;
+
+import org.apache.tuscany.sca.assembly.Binding;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
+import org.apache.tuscany.sca.deployment.Deployer;
+import org.apache.tuscany.sca.implementation.osgi.SCAConfig;
+import org.apache.tuscany.sca.implementation.osgi.ServiceDescription;
+import org.apache.tuscany.sca.implementation.osgi.ServiceDescriptions;
+import org.apache.tuscany.sca.osgi.remoteserviceadmin.impl.OSGiHelper;
+import org.apache.tuscany.sca.policy.Intent;
+import org.apache.tuscany.sca.policy.PolicySet;
+import org.oasisopen.sca.ServiceRuntimeException;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.BundleEvent;
+import org.osgi.framework.Constants;
+import org.osgi.framework.Filter;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.util.tracker.BundleTracker;
+import org.osgi.util.tracker.BundleTrackerCustomizer;
+import org.osgi.util.tracker.ServiceTracker;
+
+public class LocalDiscoveryService extends AbstractDiscoveryService implements BundleTrackerCustomizer {
+ private Deployer deployer;
+ private BundleTracker bundleTracker;
+ private Collection<ExtenderConfiguration> extenders = new ArrayList<ExtenderConfiguration>();
+
+ public LocalDiscoveryService(BundleContext context) {
+ super(context);
+ }
+
+ public void start() {
+ super.start();
+
+ UtilityExtensionPoint utilities = this.registry.getExtensionPoint(UtilityExtensionPoint.class);
+ this.deployer = utilities.getUtility(Deployer.class);
+ bundleTracker = new BundleTracker(context, Bundle.ACTIVE | Bundle.STARTING, this);
+ bundleTracker.open();
+ }
+
+ public static ServiceTracker getTracker(BundleContext context) {
+ Filter filter = null;
+ try {
+ filter =
+ context.createFilter("(& (" + Discovery.SUPPORTED_PROTOCOLS
+ + "=local) ("
+ + Constants.OBJECTCLASS
+ + "="
+ + Discovery.class.getName()
+ + "))");
+ } catch (InvalidSyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ return new ServiceTracker(context, filter, null);
+ }
+
+ private EndpointDescription createEndpointDescription(ServiceDescription sd) {
+ Map<String, Object> props = new HashMap<String, Object>(sd.getProperties());
+ props.put(Constants.OBJECTCLASS, sd.getInterfaces().toArray(new String[sd.getInterfaces().size()]));
+ if (!props.containsKey(ENDPOINT_SERVICE_ID)) {
+ props.put(ENDPOINT_SERVICE_ID, Long.valueOf(System.currentTimeMillis()));
+ }
+ if (!props.containsKey(ENDPOINT_FRAMEWORK_UUID)) {
+ props.put(ENDPOINT_FRAMEWORK_UUID, OSGiHelper.getFrameworkUUID(context));
+ }
+ if (!props.containsKey(ENDPOINT_ID)) {
+ props.put(ENDPOINT_ID, UUID.randomUUID().toString());
+ }
+
+ EndpointDescription sed = new EndpointDescription(props);
+ return sed;
+ }
+
+ private void removeServicesDeclaredInBundle(Bundle bundle) {
+ for (Iterator<Map.Entry<EndpointDescription, Bundle>> i = endpointDescriptions.entrySet().iterator(); i.hasNext();) {
+ Entry<EndpointDescription, Bundle> entry = i.next();
+ if (entry.getValue().equals(bundle)) {
+ serviceDescriptionRemoved(entry.getKey());
+ i.remove();
+ }
+ }
+ }
+
+ private void serviceDescriptionAdded(EndpointDescription endpointDescription) {
+ endpointChanged(endpointDescription, ADDED);
+ }
+
+ private void serviceDescriptionRemoved(EndpointDescription endpointDescription) {
+ endpointChanged(endpointDescription, REMOVED);
+ }
+
+ public void stop() {
+ if (bundleTracker != null) {
+ bundleTracker.close();
+ }
+ super.stop();
+ }
+
+ public Object addingBundle(Bundle bundle, BundleEvent event) {
+ if (bundle.getHeaders().get(Constants.FRAGMENT_HOST) != null || bundle.getBundleId() == 0) {
+ // Ignore fragments
+ return null;
+ }
+ Collection<URL> scaConfigs = getConfiguration(bundle, "SCA-Configuration", "OSGI-INF/sca-config/*.xml");
+ Collection<URL> descriptions = getConfiguration(bundle, "Remote-Service", null);
+ if (scaConfigs.isEmpty() && descriptions.isEmpty()) {
+ return null;
+ }
+ ExtenderConfiguration extender = new ExtenderConfiguration();
+ for (URL url : scaConfigs) {
+ try {
+ SCAConfig scaConfig = deployer.loadXMLDocument(url, deployer.createMonitor());
+ extender.scaConfigs.add(scaConfig);
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, e.getMessage(), e);
+ throw new ServiceRuntimeException(e);
+ }
+ }
+ for (URL url : descriptions) {
+ try {
+ ServiceDescriptions sds = deployer.loadXMLDocument(url, deployer.createMonitor());
+ extender.remoteServiceDescriptions.add(sds);
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, e.getMessage(), e);
+ // throw new ServiceRuntimeException(e);
+ }
+ }
+
+ // Add to the extenders before notifying the listeners (the endpoints may references to the config)
+ ExtenderConfiguration.validate(extenders, extender);
+ this.extenders.add(extender);
+
+ // Notify
+ for (ServiceDescriptions sds : extender.getRemoteServiceDescriptions()) {
+ for (ServiceDescription sd : sds) {
+ EndpointDescription sed = createEndpointDescription(sd);
+ endpointDescriptions.put(sed, bundle);
+ serviceDescriptionAdded(sed);
+ }
+ }
+
+ return extender;
+ }
+
+ public void modifiedBundle(Bundle bundle, BundleEvent event, Object object) {
+ // STARTING --> ACTIVE
+ }
+
+ public void removedBundle(Bundle bundle, BundleEvent event, Object object) {
+ if (object instanceof ExtenderConfiguration) {
+ extenders.remove((ExtenderConfiguration)object);
+ removeServicesDeclaredInBundle(bundle);
+ }
+ }
+
+ public Collection<ExtenderConfiguration> getConfigurations() {
+ return extenders;
+ }
+
+ public static class ExtenderConfiguration {
+ private Collection<SCAConfig> scaConfigs = new ArrayList<SCAConfig>();
+ private Collection<ServiceDescriptions> remoteServiceDescriptions = new ArrayList<ServiceDescriptions>();
+
+ public Collection<ServiceDescriptions> getRemoteServiceDescriptions() {
+ return remoteServiceDescriptions;
+ }
+
+ public Collection<SCAConfig> getSCAConfigs() {
+ return scaConfigs;
+ }
+
+ public static void validate(Collection<ExtenderConfiguration> configs, ExtenderConfiguration newConfig) {
+ Map<QName, Binding> bindings = new HashMap<QName, Binding>();
+ Map<QName, Intent> intents = new HashMap<QName, Intent>();
+ Map<QName, PolicySet> policySets = new HashMap<QName, PolicySet>();
+
+ for (ExtenderConfiguration config : configs) {
+ for (SCAConfig c : config.getSCAConfigs()) {
+ addBindings(bindings, c);
+ addIntents(intents, c);
+ addPolicySets(policySets, c);
+ }
+ }
+ for (SCAConfig c : newConfig.getSCAConfigs()) {
+ addBindings(bindings, c);
+ addIntents(intents, c);
+ addPolicySets(policySets, c);
+ }
+ }
+
+ private static void addIntents(Map<QName, Intent> intents, SCAConfig c) {
+ for (Intent i: c.getIntents()) {
+ if (intents.put(i.getName(), i) != null) {
+ throw new ServiceRuntimeException("Duplicate intent: " + i.getName());
+ }
+ }
+ }
+
+ private static void addPolicySets(Map<QName, PolicySet> policySets, SCAConfig c) {
+ for (PolicySet ps: c.getPolicySets()) {
+ if (policySets.put(ps.getName(), ps) != null) {
+ throw new ServiceRuntimeException("Duplicate policySet: " + ps.getName());
+ }
+ }
+ }
+
+ private static void addBindings(Map<QName, Binding> bindings, SCAConfig c) {
+ for (Binding b : c.getBindings()) {
+ QName name = new QName(c.getTargetNamespace(), b.getName());
+ if (bindings.put(name, b) != null) {
+ throw new ServiceRuntimeException("Duplicate binding: " + name);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ protected Dictionary<String, Object> getProperties() {
+ Dictionary<String, Object> props = super.getProperties();
+ props.put(SUPPORTED_PROTOCOLS, new String[] {"local"});
+ return props;
+ }
+
+}