summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x
diff options
context:
space:
mode:
authorrfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68>2010-01-19 05:37:38 +0000
committerrfeng <rfeng@13f79535-47bb-0310-9956-ffa450edef68>2010-01-19 05:37:38 +0000
commit9da25a6532fcbb9ae66294f1ff9ea903653c83e5 (patch)
treea3b91c4a580b096d5bdaa38c0be963f4968b7d28 /sca-java-2.x
parent347f83ffa0b5cf5e69289dbf290c09b01a227c5c (diff)
Expose system definitions from the deployer
Add the removal of entries when the member leaves the group Add listeners to the hazelcastInstance git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@900661 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'sca-java-2.x')
-rw-r--r--sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java8
-rw-r--r--sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java13
-rw-r--r--sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java7
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java133
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java183
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java12
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java40
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java60
-rw-r--r--sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java189
-rw-r--r--sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java24
-rw-r--r--sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java28
-rw-r--r--sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java16
12 files changed, 547 insertions, 166 deletions
diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
index 3dafaedf35..3e73960367 100644
--- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
+++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
@@ -508,8 +508,12 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint
bind(compositeContext);
}
}
- RuntimeEndpointImpl ep = (RuntimeEndpointImpl)serializer.readEndpoint(xml);
- copyFrom(ep);
+ if (serializer != null) {
+ RuntimeEndpointImpl ep = (RuntimeEndpointImpl)serializer.readEndpoint(xml);
+ copyFrom(ep);
+ } else {
+ // FIXME: [rfeng] What should we do here?
+ }
}
super.resolve();
}
diff --git a/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java b/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java
index b0b08cd49e..6d62446ccb 100644
--- a/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java
+++ b/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java
@@ -42,6 +42,7 @@ import org.apache.tuscany.sca.contribution.processor.ContributionWriteException;
import org.apache.tuscany.sca.contribution.processor.ProcessorContext;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.LifeCycleListener;
+import org.apache.tuscany.sca.definitions.Definitions;
import org.apache.tuscany.sca.monitor.Monitor;
/**
@@ -186,14 +187,8 @@ public interface Deployer extends LifeCycleListener {
*/
ExtensionPointRegistry getExtensionPointRegistry();
- /*
- * @see org.apache.tuscany.sca.core.LifeCycleListener#start()
- */
- void start();
-
- /*
- * @see org.apache.tuscany.sca.core.LifeCycleListener#stop()
+ /**
+ * Get the system definitions
*/
- void stop();
-
+ Definitions getSystemDefinitions();
}
diff --git a/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java b/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java
index b012fd795d..e4340102fc 100644
--- a/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java
+++ b/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java
@@ -50,8 +50,8 @@ import org.apache.tuscany.sca.contribution.ContributionFactory;
import org.apache.tuscany.sca.contribution.DefaultImport;
import org.apache.tuscany.sca.contribution.Export;
import org.apache.tuscany.sca.contribution.Import;
-import org.apache.tuscany.sca.contribution.namespace.NamespaceImport;
import org.apache.tuscany.sca.contribution.java.JavaImport;
+import org.apache.tuscany.sca.contribution.namespace.NamespaceImport;
import org.apache.tuscany.sca.contribution.processor.ContributionReadException;
import org.apache.tuscany.sca.contribution.processor.ContributionResolveException;
import org.apache.tuscany.sca.contribution.processor.ContributionWriteException;
@@ -568,4 +568,9 @@ public class DeployerImpl implements Deployer {
this.schemaValidationEnabled = schemaValidationEnabled;
}
+ public Definitions getSystemDefinitions() {
+ init();
+ return systemDefinitions;
+ }
+
}
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
index a65c1a9c23..f9ec30011e 100644
--- a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
@@ -38,14 +38,19 @@ import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import com.hazelcast.config.Config;
import com.hazelcast.config.TcpIpConfig;
import com.hazelcast.config.XmlConfigBuilder;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.MembershipEvent;
+import com.hazelcast.core.MembershipListener;
import com.hazelcast.nio.Address;
/**
* An EndpointRegistry using a Hazelcast
*/
-public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleListener {
+public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleListener, EntryListener<String, Endpoint>, MembershipListener {
private final static Logger logger = Logger.getLogger(HazelcastEndpointRegistry.class.getName());
private List<EndpointReference> endpointreferences = new CopyOnWriteArrayList<EndpointReference>();
@@ -54,14 +59,14 @@ public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleLis
private ExtensionPointRegistry registry;
private ConfigURI configURI;
- private HazelcastInstance hazelcastInstance;
- private Map<Object, Object> map;
+ HazelcastInstance hazelcastInstance;
+ Map<Object, Object> map;
private List<String> localEndpoints = new ArrayList<String>();;
public HazelcastEndpointRegistry(ExtensionPointRegistry registry,
- Map<String, String> attributes,
- String domainRegistryURI,
- String domainURI) {
+ Map<String, String> attributes,
+ String domainRegistryURI,
+ String domainURI) {
this.registry = registry;
this.configURI = new ConfigURI(domainRegistryURI);
}
@@ -74,22 +79,27 @@ public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleLis
map = new HashMap<Object, Object>();
} else {
initHazelcastInstance();
- map = hazelcastInstance.getMap(configURI.getDomainName() + "Endpoints");
+ IMap imap = hazelcastInstance.getMap(configURI.getDomainName() + "/Endpoints");
+ imap.addEntryListener(this, true);
+ map = imap;
+ hazelcastInstance.getCluster().addMembershipListener(this);
}
}
public void stop() {
if (hazelcastInstance != null) {
hazelcastInstance.shutdown();
+ hazelcastInstance = null;
+ map = null;
}
}
- private void initHazelcastInstance() {
+ private void initHazelcastInstance() {
Config config = new XmlConfigBuilder().build();
config.setPort(configURI.getListenPort());
//config.setPortAutoIncrement(false);
-
+
if (configURI.getBindAddress() != null) {
config.getNetworkConfig().getInterfaces().setEnabled(true);
config.getNetworkConfig().getInterfaces().clear();
@@ -107,6 +117,8 @@ public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleLis
config.getNetworkConfig().getJoin().getMulticastConfig().setMulticastGroup(configURI.getMulticastAddress());
}
+ // config.getMapConfig(configURI.getDomainName() + "/Endpoints").setBackupCount(0);
+
if (configURI.getRemotes().size() > 0) {
TcpIpConfig tcpconfig = config.getNetworkConfig().getJoin().getJoinMembers();
tcpconfig.setEnabled(true);
@@ -200,15 +212,15 @@ public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleLis
endpoint.setRemote(true);
}
// if (!entry.isPrimary()) {
- ((RuntimeEndpoint) endpoint).bind(registry, this);
+ ((RuntimeEndpoint)endpoint).bind(registry, this);
// }
foundEndpoints.add(endpoint);
logger.fine("Found endpoint with matching service - " + endpoint);
- }
+ }
// else the service name doesn't match
}
}
-
+
return foundEndpoints;
}
@@ -252,46 +264,65 @@ public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleLis
}
public void updateEndpoint(String uri, Endpoint endpoint) {
-// // TODO: is updateEndpoint needed?
-// throw new UnsupportedOperationException();
+ // // TODO: is updateEndpoint needed?
+ // throw new UnsupportedOperationException();
+ }
+
+ public void entryAdded(EntryEvent<String, Endpoint> event) {
+ entryAdded(event.getKey(), event.getValue());
}
-// public void entryAdded(Object key, Object value) {
-// MapEntry entry = (MapEntry)value;
-// Endpoint newEp = (Endpoint)entry.getValue();
-// if (!isLocal(entry)) {
-// logger.info(id + " Remote endpoint added: " + entry.getValue());
-// newEp.setRemote(true);
-// }
-// ((RuntimeEndpoint) newEp).bind(registry, this);
-// for (EndpointListener listener : listeners) {
-// listener.endpointAdded(newEp);
-// }
-// }
-//
-// public void entryRemoved(Object key, Object value) {
-// MapEntry entry = (MapEntry)value;
-// if (!isLocal(entry)) {
-// logger.info(id + " Remote endpoint removed: " + entry.getValue());
-// }
-// Endpoint oldEp = (Endpoint)entry.getValue();
-// for (EndpointListener listener : listeners) {
-// listener.endpointRemoved(oldEp);
-// }
-// }
-//
-// public void entryUpdated(Object key, Object oldValue, Object newValue) {
-// MapEntry oldEntry = (MapEntry)oldValue;
-// MapEntry newEntry = (MapEntry)newValue;
-// if (!isLocal(newEntry)) {
-// logger.info(id + " Remote endpoint updated: " + newEntry.getValue());
-// }
-// Endpoint oldEp = (Endpoint)oldEntry.getValue();
-// Endpoint newEp = (Endpoint)newEntry.getValue();
-// ((RuntimeEndpoint) newEp).bind(registry, this);
-// for (EndpointListener listener : listeners) {
-// listener.endpointUpdated(oldEp, newEp);
-// }
-// }
+ public void entryEvicted(EntryEvent<String, Endpoint> event) {
+ // Should not happen
+ }
+
+ public void entryRemoved(EntryEvent<String, Endpoint> event) {
+ entryRemoved(event.getKey(), event.getValue());
+ }
+
+ public void entryUpdated(EntryEvent<String, Endpoint> event) {
+ entryUpdated(event.getKey(), null, event.getValue());
+ }
+
+ public void entryAdded(Object key, Object value) {
+ Endpoint newEp = (Endpoint)value;
+ if (!isLocal(newEp)) {
+ logger.info(" Remote endpoint added: " + newEp);
+ newEp.setRemote(true);
+ }
+ ((RuntimeEndpoint)newEp).bind(registry, this);
+ for (EndpointListener listener : listeners) {
+ listener.endpointAdded(newEp);
+ }
+ }
+
+ public void entryRemoved(Object key, Object value) {
+ Endpoint oldEp = (Endpoint)value;
+ if (!isLocal(oldEp)) {
+ logger.info(" Remote endpoint removed: " + value);
+ }
+ ((RuntimeEndpoint) oldEp).bind(registry, this);
+ for (EndpointListener listener : listeners) {
+ listener.endpointRemoved(oldEp);
+ }
+ }
+
+ public void entryUpdated(Object key, Object oldValue, Object newValue) {
+ Endpoint oldEp = (Endpoint)oldValue;
+ Endpoint newEp = (Endpoint)newValue;
+ if (!isLocal(newEp)) {
+ logger.info(" Remote endpoint updated: " + newEp);
+ }
+ ((RuntimeEndpoint)newEp).bind(registry, this);
+ for (EndpointListener listener : listeners) {
+ listener.endpointUpdated(oldEp, newEp);
+ }
+ }
+
+ public void memberAdded(MembershipEvent event) {
+ }
+
+ public void memberRemoved(MembershipEvent event) {
+ }
}
diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java
new file mode 100644
index 0000000000..32e49dbe41
--- /dev/null
+++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.endpoint.hazelcast;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.tuscany.sca.assembly.AssemblyFactory;
+import org.apache.tuscany.sca.assembly.Binding;
+import org.apache.tuscany.sca.assembly.Component;
+import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.assembly.SCABindingFactory;
+import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.runtime.EndpointListener;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.hazelcast.core.IMap;
+
+// Ignore so its not run in the build yet till its working
+@Ignore("Hazelcast doesn't support the map entry management by members")
+public class MultiRegTestCase implements EndpointListener {
+ private static ExtensionPointRegistry extensionPoints;
+ private static AssemblyFactory assemblyFactory;
+ private static SCABindingFactory scaBindingFactory;
+
+ @BeforeClass
+ public static void init() {
+ extensionPoints = new DefaultExtensionPointRegistry();
+ FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
+ assemblyFactory = factories.getFactory(AssemblyFactory.class);
+ scaBindingFactory = factories.getFactory(SCABindingFactory.class);
+ }
+
+ @Test
+ public void testReplication() throws Exception {
+ RuntimeEndpoint ep1 = createEndpoint("ep1uri");
+
+ String host = InetAddress.getLocalHost().getHostAddress();
+ String bind = null; // "9.65.158.31";
+ String port1 = "8085";
+ String port2 = "8086";
+ String port3 = "8087";
+ String range = "1";
+
+ Map<String, String> attrs1 = new HashMap<String, String>();
+ // attrs1.put("nomcast", "true");
+ attrs1.put("bind", bind);
+ attrs1.put("receiverPort", port1);
+ attrs1.put("receiverAutoBind", range);
+ // attrs1.put("routes", host + ":" + port2 + " " + host + ":" + port3);
+ HazelcastEndpointRegistry reg1 = new HazelcastEndpointRegistry(extensionPoints, attrs1, "tuscany:foo", "bar");
+ reg1.addListener(this);
+ reg1.start();
+
+ Map<String, String> attrs2 = new HashMap<String, String>();
+ // attrs2.put("nomcast", "true");
+ attrs1.put("bind", bind);
+ attrs2.put("receiverPort", port2);
+ attrs2.put("receiverAutoBind", range);
+ // attrs2.put("routes", host + ":"+port1);
+ HazelcastEndpointRegistry reg2 = new HazelcastEndpointRegistry(extensionPoints, attrs2, "tuscany:foo", "bar");
+ reg2.addListener(this);
+ reg2.start();
+
+ Map<String, String> attrs3 = new HashMap<String, String>();
+ // attrs3.put("nomcast", "true");
+ attrs1.put("bind", bind);
+ attrs3.put("receiverPort", port3);
+ attrs3.put("receiverAutoBind", range);
+ // attrs3.put("routes", host + ":"+port1);
+ HazelcastEndpointRegistry reg3 = new HazelcastEndpointRegistry(extensionPoints, attrs3, "tuscany:foo", "bar");
+ reg3.addListener(this);
+ reg3.start();
+
+ ep1.bind(extensionPoints, reg1);
+ reg1.addEndpoint(ep1);
+ assertExists(reg1, "ep1uri");
+ assertExists(reg2, "ep1uri");
+ assertExists(reg3, "ep1uri");
+
+ RuntimeEndpoint ep2 = createEndpoint("ep2uri");
+ ep2.bind(extensionPoints, reg2);
+ reg2.addEndpoint(ep2);
+ assertExists(reg2, "ep2uri");
+ assertExists(reg1, "ep2uri");
+ assertExists(reg3, "ep2uri");
+
+ System.out.println(((IMap)reg1.map).localKeySet().size());
+ System.out.println(((IMap)reg2.map).localKeySet().size());
+ System.out.println(((IMap)reg3.map).localKeySet().size());
+
+ reg1.stop();
+ Thread.sleep(6000);
+ Assert.assertNull(reg2.getEndpoint("ep1uri"));
+ Assert.assertNull(reg3.getEndpoint("ep1uri"));
+
+ System.out.println(((IMap)reg2.map).localKeySet().size());
+ System.out.println(((IMap)reg3.map).localKeySet().size());
+
+ assertExists(reg2, "ep2uri");
+ assertExists(reg3, "ep2uri");
+
+ reg1.start();
+ ep1.bind(extensionPoints, reg1);
+ reg1.addEndpoint(ep1);
+ assertExists(reg1, "ep1uri");
+ assertExists(reg2, "ep1uri");
+ assertExists(reg3, "ep1uri");
+
+ reg1.stop();
+ reg2.stop();
+ reg3.stop();
+ System.out.println(); // closed
+ }
+
+ private Endpoint assertExists(HazelcastEndpointRegistry reg, String uri) throws InterruptedException {
+ Endpoint ep = null;
+ int count = 0;
+ while (ep == null && count < 15) {
+ ep = reg.getEndpoint(uri);
+ if (ep == null) {
+ Thread.sleep(1000);
+ System.out.println(reg + ": tries=" + count);
+ }
+ count++;
+ }
+ Assert.assertNotNull(ep);
+ Assert.assertEquals(uri, ep.getURI());
+ return ep;
+ }
+
+ private RuntimeEndpoint createEndpoint(String uri) {
+ RuntimeEndpoint ep = (RuntimeEndpoint) assemblyFactory.createEndpoint();
+ Component comp = assemblyFactory.createComponent();
+ ep.setComponent(comp);
+ ep.setService(assemblyFactory.createComponentService());
+ Binding b = scaBindingFactory.createSCABinding();
+ ep.setBinding(b);
+ ep.setURI(uri);
+ return ep;
+ }
+
+ private void print(String prefix, Endpoint ep) {
+ System.out.println(prefix + ": "+ep);
+ }
+
+ public void endpointAdded(Endpoint endpoint) {
+ print("Added", endpoint);
+ }
+
+ public void endpointRemoved(Endpoint endpoint) {
+ print("Removed", endpoint);
+ }
+
+ public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) {
+ print("Updated", newEndpoint);
+ }
+
+}
diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
index 47b3450594..fe683af025 100644
--- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
+++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
@@ -708,7 +708,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
// [rfeng] Change the behavior to replicate to all nodes
if (entry.isPrimary() && self.equals(entry.getPrimary())) {
try {
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes());
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
entry.setBackupNodes(backup);
entry.setPrimary(self);
} catch (ChannelException x) {
@@ -768,7 +768,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
if (log.isDebugEnabled())
log.debug("[1] Primary choosing a new backup");
try {
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes());
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
entry.setBackupNodes(backup);
entry.setPrimary(channel.getLocalMember(false));
} catch (ChannelException x) {
@@ -798,7 +798,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
entry.setPrimary(channel.getLocalMember(false));
entry.setBackup(false);
entry.setProxy(false);
- Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes());
+ Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
entry.setBackupNodes(backup);
if (mapOwner != null)
mapOwner.objectMadePrimay(entry.getKey(), entry.getValue());
@@ -833,7 +833,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
return members[node];
}
- protected abstract Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException;
+ protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
public void heartbeat() {
try {
@@ -916,7 +916,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
}
if (entry.isBackup()) {
//select a new backup node
- backup = publishEntryInfo(key, entry.getValue(), entry.getBackupNodes());
+ backup = publishEntryInfo(key, entry.getValue());
} else if (entry.isProxy()) {
//invalidate the previous primary
msg =
@@ -997,7 +997,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb
old = remove(key);
try {
if (notify) {
- Member[] backup = publishEntryInfo(key, value, entry.getBackupNodes());
+ Member[] backup = publishEntryInfo(key, value);
entry.setBackupNodes(backup);
}
} catch (ChannelException x) {
diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
index 0c2144ea49..42d2eda6a3 100644
--- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
+++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
@@ -38,11 +38,13 @@ import java.util.logging.Logger;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelReceiver;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.GroupChannel;
import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
import org.apache.catalina.tribes.membership.McastService;
import org.apache.catalina.tribes.membership.StaticMember;
+import org.apache.catalina.tribes.transport.ReceiverBase;
import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.core.ExtensionPointRegistry;
@@ -67,6 +69,9 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
private String address = MULTICAST_ADDRESS;
private String bind = null;
private int timeout = 50;
+ private int receiverPort = 4000;
+ private int receiverAutoBind = 100;
+ private List<URI> staticRoutes;
private final static String DEFAULT_DOMAIN_URI = "http://tuscany.apache.org/sca/1.1/domains/default";
private String domainURI = DEFAULT_DOMAIN_URI;
@@ -75,7 +80,6 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
private ExtensionPointRegistry registry;
private ReplicatedMap map;
- private static List<URI> staticRoutes;
private String id;
private boolean noMultiCast;
@@ -175,6 +179,14 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
if (mcast != null) {
noMultiCast = Boolean.valueOf(mcast);
}
+ String recvPort = attributes.get("receiverPort");
+ if (recvPort != null) {
+ receiverPort = Integer.parseInt(recvPort);
+ }
+ String recvAutoBind = attributes.get("receiverAutoBind");
+ if (recvAutoBind != null) {
+ receiverAutoBind = Integer.parseInt(recvAutoBind);
+ }
}
public void start() {
@@ -190,9 +202,24 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
if (noMultiCast) {
map.getChannel().addInterceptor(new DisableMcastInterceptor());
}
-
- // http://www.mail-archive.com/users@tomcat.apache.org/msg24873.html
- int port = channel.getChannelReceiver().getPort();
+
+ // Configure the receiver ports
+ ChannelReceiver receiver = channel.getChannelReceiver();
+ if (receiver instanceof ReceiverBase) {
+ ((ReceiverBase)receiver).setAutoBind(receiverAutoBind);
+ ((ReceiverBase)receiver).setPort(receiverPort);
+ }
+
+ /*
+ Object sender = channel.getChannelSender();
+ if (sender instanceof ReplicationTransmitter) {
+ sender = ((ReplicationTransmitter)sender).getTransport();
+ }
+ if (sender instanceof AbstractSender) {
+ ((AbstractSender)sender).setKeepAliveCount(0);
+ ((AbstractSender)sender).setMaxRetryAttempts(5);
+ }
+ */
if (staticRoutes != null) {
StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
@@ -200,12 +227,12 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
Member member;
try {
// The port has to match the receiver port
- member = new StaticMember(staticRoute.getHost(), port, 5000);
+ member = new StaticMember(staticRoute.getHost(), staticRoute.getPort(), 5000);
} catch (IOException e) {
throw new RuntimeException(e);
}
smi.addStaticMember(member);
- logger.info("Added static route: " + staticRoute.getHost() + ":" + port);
+ logger.info("Added static route: " + staticRoute.getHost() + ":" + staticRoute.getPort());
}
smi.setLocalMember(map.getChannel().getLocalMember(false));
map.getChannel().addInterceptor(smi);
@@ -410,6 +437,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi
logger.info(id + " Remote endpoint removed: " + entry.getValue());
}
Endpoint oldEp = (Endpoint)entry.getValue();
+ ((RuntimeEndpoint) oldEp).bind(registry, this);
for (EndpointListener listener : listeners) {
listener.endpointRemoved(oldEp);
}
diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
index 762407604d..669ad82192 100644
--- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
+++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
@@ -17,6 +17,8 @@
package org.apache.tuscany.sca.endpoint.tribes;
import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
@@ -97,29 +99,65 @@ public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback,
* @return Member - the backup node
* @throws ChannelException
*/
- protected Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException {
+ protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
if (!(key instanceof Serializable && value instanceof Serializable))
return new Member[0];
//select a backup node
- Member[] backup = getMapMembers();
+ Member[] members = getMapMembers();
- if (backup == null || backup.length == 0)
- return null;
-
- // Set the receivers to these members that are not in the backup nodes yet
- Member[] members = backup;
- if (backupNodes != null) {
- members = getMapMembersExcl(backupNodes);
+ if (members == null || members.length == 0) {
+ return new Member[0];
}
//publish the data out to all nodes
MapMessage msg =
new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value,
- null, channel.getLocalMember(false), backup);
+ null, channel.getLocalMember(false), members);
getChannel().send(members, msg, getChannelSendOptions());
- return backup;
+ return members;
+ }
+
+ /**
+ * Override the base method to look up existing entries only
+ */
+ public Object get(Object key) {
+ MapEntry entry = super.getInternal(key);
+ if (log.isTraceEnabled())
+ log.trace("Requesting id:" + key + " entry:" + entry);
+ if (entry == null) {
+ return null;
+ }
+ return entry.getValue();
}
+ /**
+ * Override the base method to remove all entries owned by the member that disappeared
+ */
+ public void memberDisappeared(Member member) {
+ boolean removed = false;
+ synchronized (mapMembers) {
+ removed = (mapMembers.remove(member) != null);
+ if (!removed) {
+ if (log.isDebugEnabled())
+ log.debug("Member[" + member + "] disappeared, but was not present in the map.");
+ return; //the member was not part of our map.
+ }
+ }
+
+ Iterator<Map.Entry<Object, Object>> i = super.entrySetFull().iterator();
+ while (i.hasNext()) {
+ Map.Entry<Object, Object> e = i.next();
+ MapEntry entry = (MapEntry)super.getInternal(e.getKey());
+ if (entry == null) {
+ continue;
+ }
+ if (member.equals(entry.getPrimary())) {
+ if (log.isDebugEnabled())
+ log.debug("[2] Primary disappeared");
+ i.remove();
+ } //end if
+ } //while
+ }
}
diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java
index d329ebd066..a470c47ba0 100644
--- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java
+++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java
@@ -19,6 +19,7 @@
package org.apache.tuscany.sca.endpoint.tribes;
+import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
@@ -28,109 +29,141 @@ import org.apache.tuscany.sca.assembly.Component;
import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.assembly.SCABindingFactory;
import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.runtime.EndpointListener;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.junit.Assert;
-import org.junit.Ignore;
+import org.junit.BeforeClass;
import org.junit.Test;
// Ignore so its not run in the build yet till its working
-@Ignore
-public class MultiRegTestCase {
-
-// @Test
-// public void testTwoNodesMultiCast() throws InterruptedException {
-// DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry();
-// FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
-// AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class);
-//
-// ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
-// reg1.start();
-//
-// Endpoint ep1 = assemblyFactory.createEndpoint();
-// Component comp = assemblyFactory.createComponent();
-// ep1.setComponent(comp);
-// ep1.setService(assemblyFactory.createComponentService());
-// Binding b = new SCABindingFactoryImpl().createSCABinding();
-// ep1.setBinding(b);
-// ep1.setURI("ep1uri");
-// reg1.addEndpoint(ep1);
-//
-// Endpoint ep1p = reg1.getEndpoint("ep1uri");
-// Assert.assertNotNull(ep1p);
-// Assert.assertEquals("ep1uri", ep1p.getURI());
-//
-// ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
-// reg2.start();
-// Thread.sleep(5000);
-//
-// Endpoint ep1p2 = reg2.getEndpoint("ep1uri");
-// Assert.assertNotNull(ep1p2);
-// Assert.assertEquals("ep1uri", ep1p2.getURI());
-//
-// reg1.stop();
-// reg2.stop();
-// }
+public class MultiRegTestCase implements EndpointListener {
+ private static ExtensionPointRegistry extensionPoints;
+ private static AssemblyFactory assemblyFactory;
+ private static SCABindingFactory scaBindingFactory;
- @Test
- public void testTwoNodesStaticNoMultiCast() throws InterruptedException {
- DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry();
+ @BeforeClass
+ public static void init() {
+ extensionPoints = new DefaultExtensionPointRegistry();
FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
- AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class);
+ assemblyFactory = factories.getFactory(AssemblyFactory.class);
+ scaBindingFactory = factories.getFactory(SCABindingFactory.class);
+ }
+
+ @Test
+ public void testReplication() throws Exception {
+ RuntimeEndpoint ep1 = createEndpoint("ep1uri");
+
+ String host = InetAddress.getLocalHost().getHostAddress();
+ String bind = null; // "9.65.158.31";
+ String port1 = "8085";
+ String port2 = "8086";
+ String port3 = "8087";
+ String range = "1";
Map<String, String> attrs1 = new HashMap<String, String>();
- attrs1.put("nomcast", "true");
- attrs1.put("routes", "9.167.197.91:4001 9.167.197.91:4002");
+ // attrs1.put("nomcast", "true");
+ attrs1.put("bind", bind);
+ attrs1.put("receiverPort", port1);
+ attrs1.put("receiverAutoBind", range);
+ // attrs1.put("routes", host + ":" + port2 + " " + host + ":" + port3);
ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, attrs1, "foo", "bar");
+ reg1.addListener(this);
reg1.start();
- Endpoint ep1 = assemblyFactory.createEndpoint();
- Component comp = assemblyFactory.createComponent();
- ep1.setComponent(comp);
- ep1.setService(assemblyFactory.createComponentService());
- Binding b = factories.getFactory(SCABindingFactory.class).createSCABinding();
- ep1.setBinding(b);
- ep1.setURI("ep1uri");
- reg1.addEndpoint(ep1);
-
- Endpoint ep1p = reg1.getEndpoint("ep1uri");
- Assert.assertNotNull(ep1p);
- Assert.assertEquals("ep1uri", ep1p.getURI());
-
Map<String, String> attrs2 = new HashMap<String, String>();
- attrs2.put("nomcast", "true");
- attrs2.put("routes", "9.167.197.91:4000");
+ // attrs2.put("nomcast", "true");
+ attrs1.put("bind", bind);
+ attrs2.put("receiverPort", port2);
+ attrs2.put("receiverAutoBind", range);
+ // attrs2.put("routes", host + ":"+port1);
ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, attrs2, "foo", "bar");
+ reg2.addListener(this);
reg2.start();
-
- System.out.println("wait");
- Thread.sleep(10000);
- System.out.println("run");
-
- Endpoint ep1p2 = reg2.getEndpoint("ep1uri");
- Assert.assertNotNull(ep1p2);
- Assert.assertEquals("ep1uri", ep1p2.getURI());
Map<String, String> attrs3 = new HashMap<String, String>();
- attrs3.put("nomcast", "true");
- attrs3.put("routes", "9.167.197.91:4000");
+ // attrs3.put("nomcast", "true");
+ attrs1.put("bind", bind);
+ attrs3.put("receiverPort", port3);
+ attrs3.put("receiverAutoBind", range);
+ // attrs3.put("routes", host + ":"+port1);
ReplicatedEndpointRegistry reg3 = new ReplicatedEndpointRegistry(extensionPoints, attrs3, "foo", "bar");
+ reg3.addListener(this);
reg3.start();
-
- System.out.println("wait");
- Thread.sleep(5000);
- System.out.println("run");
- Endpoint ep1p3 = reg3.getEndpoint("ep1uri");
- Assert.assertNotNull(ep1p3);
- Assert.assertEquals("ep1uri", ep1p3.getURI());
+ ep1.bind(extensionPoints, reg1);
+ reg1.addEndpoint(ep1);
+ assertExists(reg1, "ep1uri");
+ assertExists(reg2, "ep1uri");
+ assertExists(reg3, "ep1uri");
+
+ RuntimeEndpoint ep2 = createEndpoint("ep2uri");
+ ep2.bind(extensionPoints, reg2);
+ reg2.addEndpoint(ep2);
+ assertExists(reg2, "ep2uri");
+ assertExists(reg1, "ep2uri");
+ assertExists(reg3, "ep2uri");
+ reg1.stop();
+ Thread.sleep(6000);
+ Assert.assertNull(reg2.getEndpoint("ep1uri"));
+ Assert.assertNull(reg3.getEndpoint("ep1uri"));
+ assertExists(reg2, "ep2uri");
+ assertExists(reg3, "ep2uri");
+
+ reg1.start();
+ ep1.bind(extensionPoints, reg1);
+ reg1.addEndpoint(ep1);
+ assertExists(reg1, "ep1uri");
+ assertExists(reg2, "ep1uri");
+ assertExists(reg3, "ep1uri");
- System.out.println("wait2");
- Thread.sleep(5000);
- System.out.println("end");
reg1.stop();
reg2.stop();
reg3.stop();
+ System.out.println(); // closed
+ }
+
+ private Endpoint assertExists(ReplicatedEndpointRegistry reg, String uri) throws InterruptedException {
+ Endpoint ep = null;
+ int count = 0;
+ while (ep == null && count < 15) {
+ ep = reg.getEndpoint(uri);
+ Thread.sleep(1000);
+ count++;
+ System.out.println(reg + ": tries=" + count);
+ }
+ Assert.assertNotNull(ep);
+ Assert.assertEquals(uri, ep.getURI());
+ return ep;
+ }
+
+ private RuntimeEndpoint createEndpoint(String uri) {
+ RuntimeEndpoint ep = (RuntimeEndpoint) assemblyFactory.createEndpoint();
+ Component comp = assemblyFactory.createComponent();
+ ep.setComponent(comp);
+ ep.setService(assemblyFactory.createComponentService());
+ Binding b = scaBindingFactory.createSCABinding();
+ ep.setBinding(b);
+ ep.setURI(uri);
+ return ep;
+ }
+
+ private void print(String prefix, Endpoint ep) {
+ System.out.println(prefix + ": "+ep);
+ }
+
+ public void endpointAdded(Endpoint endpoint) {
+ print("Added", endpoint);
+ }
+
+ public void endpointRemoved(Endpoint endpoint) {
+ print("Removed", endpoint);
+ }
+
+ public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) {
+ print("Updated", newEndpoint);
}
}
diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java
index 949d2d8af7..8ecc5f7ea8 100644
--- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java
+++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java
@@ -69,6 +69,30 @@ public class EndpointHelper {
if (serviceID != null) {
props.put(RemoteConstants.ENDPOINT_SERVICE_ID, Long.parseLong(serviceID));
}
+
+ // FIXME: [rfeng] We need to calculate the intents supported by this endpoint
+ /*
+ QName bindingTypeName = endpoint.getBinding().getType();
+ Definitions definitions = null;
+ if(definitions!=null) {
+ for(BindingType bindingType: definitions.getBindingTypes()) {
+ if(bindingType.getType().equals(bindingTypeName)) {
+ bindingType.getAlwaysProvidedIntents();
+ }
+ }
+ */
+
+ String intents = (String)props.get(RemoteConstants.SERVICE_EXPORTED_INTENTS);
+ String extraIntents = (String)props.get(RemoteConstants.SERVICE_EXPORTED_INTENTS_EXTRA);
+ if (intents == null) {
+ intents = "";
+ }
+ if (extraIntents != null) {
+ intents = intents + " " + extraIntents;
+ }
+
+ props.put(RemoteConstants.SERVICE_INTENTS, intents.trim());
+
props.put(RemoteConstants.ENDPOINT_ID, endpoint.getURI());
// FIXME: [rfeng] How to pass in the remote service id from the endpoint XML
props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, new String[] {"org.osgi.sca"});
diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java
index 608c74bcfc..f4521cfbfc 100644
--- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java
+++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java
@@ -27,6 +27,12 @@ import java.util.Hashtable;
import java.util.List;
import java.util.Map;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
+import org.apache.tuscany.sca.definitions.Definitions;
+import org.apache.tuscany.sca.deployment.Deployer;
+import org.apache.tuscany.sca.policy.BindingType;
+import org.apache.tuscany.sca.policy.Intent;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
@@ -73,10 +79,28 @@ public class RemoteServiceAdminImpl implements RemoteServiceAdmin, ManagedServic
importer.start();
Hashtable<String, Object> props = new Hashtable<String, Object>();
props.put(RemoteConstants.REMOTE_CONFIGS_SUPPORTED, new String[] {"org.osgi.sca"});
+
+ ExtensionPointRegistry registry = exporter.getExtensionPointRegistry();
+ UtilityExtensionPoint utilities = registry.getExtensionPoint(UtilityExtensionPoint.class);
+ Deployer deployer = utilities.getUtility(Deployer.class);
+ Definitions definitions = deployer.getSystemDefinitions();
+
+ String[] intents = new String[definitions.getIntents().size()];
+ int i = 0;
+ for (Intent intent : definitions.getIntents()) {
+ intents[i++] = intent.toString();
+ }
+
+ String[] bindingTypes = new String[definitions.getBindingTypes().size()];
+ i = 0;
+ for (BindingType bindingType : definitions.getBindingTypes()) {
+ bindingTypes[i++] = bindingType.getType().toString();
+ }
+
// FIXME: We should ask SCA domain for the supported intents
- props.put(RemoteConstants.REMOTE_INTENTS_SUPPORTED, new String[] {});
+ props.put(RemoteConstants.REMOTE_INTENTS_SUPPORTED, intents);
// FIXME: We should ask SCA domain for the supported binding types
- props.put("org.osgi.sca.binding.types", new String[] {});
+ props.put("org.osgi.sca.binding.types", bindingTypes);
registration = context.registerService(RemoteServiceAdmin.class.getName(), this, props);
props = new Hashtable<String, Object>();
diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java
index f6e9855556..a7c6d04ee9 100644
--- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java
+++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java
@@ -53,6 +53,19 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements
this.domainRegistryFactory =
registry.getExtensionPoint(UtilityExtensionPoint.class).getUtility(DomainRegistryFactory.class);
domainRegistryFactory.addListener(this);
+
+ // [rfeng] Starting of the endpoint registry takes a long time and it leaves the bundle
+ // state to be starting. When the registry is started, remote endpoints come in and that
+ // triggers the classloading from this bundle.
+ Thread thread = new Thread() {
+ public void run() {
+ startEndpointRegistry();
+ }
+ };
+ thread.start();
+ }
+
+ private void startEndpointRegistry() {
// The following code forced the start() of the domain registry in absense of services
String domainRegistry = context.getProperty("org.osgi.sca.domain.registry");
if (domainRegistry == null) {
@@ -103,6 +116,8 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements
{
// Notify the endpoint listeners
EndpointDescription description = createEndpointDescription(bundleContext, endpoint);
+ // Set the owning bundle to runtime bundle to avoid NPE
+ servicesInfo.put(description, context.getBundle());
endpointChanged(description, ADDED);
}
}
@@ -115,6 +130,7 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements
*/
{
EndpointDescription description = createEndpointDescription(context, endpoint);
+ servicesInfo.remove(description);
endpointChanged(description, REMOVED);
}
}