From 9da25a6532fcbb9ae66294f1ff9ea903653c83e5 Mon Sep 17 00:00:00 2001 From: rfeng Date: Tue, 19 Jan 2010 05:37:38 +0000 Subject: Expose system definitions from the deployer Add the removal of entries when the member leaves the group Add listeners to the hazelcastInstance git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@900661 13f79535-47bb-0310-9956-ffa450edef68 --- .../core/assembly/impl/RuntimeEndpointImpl.java | 8 +- .../apache/tuscany/sca/deployment/Deployer.java | 13 +- .../tuscany/sca/deployment/impl/DeployerImpl.java | 7 +- .../hazelcast/HazelcastEndpointRegistry.java | 133 +++++++++------ .../sca/endpoint/hazelcast/MultiRegTestCase.java | 183 ++++++++++++++++++++ .../sca/endpoint/tribes/AbstractReplicatedMap.java | 12 +- .../tribes/ReplicatedEndpointRegistry.java | 40 ++++- .../tuscany/sca/endpoint/tribes/ReplicatedMap.java | 60 +++++-- .../sca/endpoint/tribes/MultiRegTestCase.java | 189 ++++++++++++--------- .../remoteserviceadmin/impl/EndpointHelper.java | 24 +++ .../impl/RemoteServiceAdminImpl.java | 28 ++- .../discovery/impl/DomainDiscoveryService.java | 16 ++ 12 files changed, 547 insertions(+), 166 deletions(-) create mode 100644 sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java (limited to 'sca-java-2.x/trunk/modules') diff --git a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java index 3dafaedf35..3e73960367 100644 --- a/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java +++ b/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java @@ -508,8 +508,12 @@ public class RuntimeEndpointImpl extends EndpointImpl implements RuntimeEndpoint bind(compositeContext); } } - RuntimeEndpointImpl ep = (RuntimeEndpointImpl)serializer.readEndpoint(xml); - copyFrom(ep); + if (serializer != null) { + RuntimeEndpointImpl ep = (RuntimeEndpointImpl)serializer.readEndpoint(xml); + copyFrom(ep); + } else { + // FIXME: [rfeng] What should we do here? + } } super.resolve(); } diff --git a/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java b/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java index b0b08cd49e..6d62446ccb 100644 --- a/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java +++ b/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java @@ -42,6 +42,7 @@ import org.apache.tuscany.sca.contribution.processor.ContributionWriteException; import org.apache.tuscany.sca.contribution.processor.ProcessorContext; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.definitions.Definitions; import org.apache.tuscany.sca.monitor.Monitor; /** @@ -186,14 +187,8 @@ public interface Deployer extends LifeCycleListener { */ ExtensionPointRegistry getExtensionPointRegistry(); - /* - * @see org.apache.tuscany.sca.core.LifeCycleListener#start() - */ - void start(); - - /* - * @see org.apache.tuscany.sca.core.LifeCycleListener#stop() + /** + * Get the system definitions */ - void stop(); - + Definitions getSystemDefinitions(); } diff --git a/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java b/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java index b012fd795d..e4340102fc 100644 --- a/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java +++ b/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java @@ -50,8 +50,8 @@ import org.apache.tuscany.sca.contribution.ContributionFactory; import org.apache.tuscany.sca.contribution.DefaultImport; import org.apache.tuscany.sca.contribution.Export; import org.apache.tuscany.sca.contribution.Import; -import org.apache.tuscany.sca.contribution.namespace.NamespaceImport; import org.apache.tuscany.sca.contribution.java.JavaImport; +import org.apache.tuscany.sca.contribution.namespace.NamespaceImport; import org.apache.tuscany.sca.contribution.processor.ContributionReadException; import org.apache.tuscany.sca.contribution.processor.ContributionResolveException; import org.apache.tuscany.sca.contribution.processor.ContributionWriteException; @@ -568,4 +568,9 @@ public class DeployerImpl implements Deployer { this.schemaValidationEnabled = schemaValidationEnabled; } + public Definitions getSystemDefinitions() { + init(); + return systemDefinitions; + } + } diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java index a65c1a9c23..f9ec30011e 100644 --- a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java +++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java @@ -38,14 +38,19 @@ import org.apache.tuscany.sca.runtime.RuntimeEndpoint; import com.hazelcast.config.Config; import com.hazelcast.config.TcpIpConfig; import com.hazelcast.config.XmlConfigBuilder; +import com.hazelcast.core.EntryEvent; +import com.hazelcast.core.EntryListener; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IMap; +import com.hazelcast.core.MembershipEvent; +import com.hazelcast.core.MembershipListener; import com.hazelcast.nio.Address; /** * An EndpointRegistry using a Hazelcast */ -public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleListener { +public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleListener, EntryListener, MembershipListener { private final static Logger logger = Logger.getLogger(HazelcastEndpointRegistry.class.getName()); private List endpointreferences = new CopyOnWriteArrayList(); @@ -54,14 +59,14 @@ public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleLis private ExtensionPointRegistry registry; private ConfigURI configURI; - private HazelcastInstance hazelcastInstance; - private Map map; + HazelcastInstance hazelcastInstance; + Map map; private List localEndpoints = new ArrayList();; public HazelcastEndpointRegistry(ExtensionPointRegistry registry, - Map attributes, - String domainRegistryURI, - String domainURI) { + Map attributes, + String domainRegistryURI, + String domainURI) { this.registry = registry; this.configURI = new ConfigURI(domainRegistryURI); } @@ -74,22 +79,27 @@ public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleLis map = new HashMap(); } else { initHazelcastInstance(); - map = hazelcastInstance.getMap(configURI.getDomainName() + "Endpoints"); + IMap imap = hazelcastInstance.getMap(configURI.getDomainName() + "/Endpoints"); + imap.addEntryListener(this, true); + map = imap; + hazelcastInstance.getCluster().addMembershipListener(this); } } public void stop() { if (hazelcastInstance != null) { hazelcastInstance.shutdown(); + hazelcastInstance = null; + map = null; } } - private void initHazelcastInstance() { + private void initHazelcastInstance() { Config config = new XmlConfigBuilder().build(); config.setPort(configURI.getListenPort()); //config.setPortAutoIncrement(false); - + if (configURI.getBindAddress() != null) { config.getNetworkConfig().getInterfaces().setEnabled(true); config.getNetworkConfig().getInterfaces().clear(); @@ -107,6 +117,8 @@ public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleLis config.getNetworkConfig().getJoin().getMulticastConfig().setMulticastGroup(configURI.getMulticastAddress()); } + // config.getMapConfig(configURI.getDomainName() + "/Endpoints").setBackupCount(0); + if (configURI.getRemotes().size() > 0) { TcpIpConfig tcpconfig = config.getNetworkConfig().getJoin().getJoinMembers(); tcpconfig.setEnabled(true); @@ -200,15 +212,15 @@ public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleLis endpoint.setRemote(true); } // if (!entry.isPrimary()) { - ((RuntimeEndpoint) endpoint).bind(registry, this); + ((RuntimeEndpoint)endpoint).bind(registry, this); // } foundEndpoints.add(endpoint); logger.fine("Found endpoint with matching service - " + endpoint); - } + } // else the service name doesn't match } } - + return foundEndpoints; } @@ -252,46 +264,65 @@ public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleLis } public void updateEndpoint(String uri, Endpoint endpoint) { -// // TODO: is updateEndpoint needed? -// throw new UnsupportedOperationException(); + // // TODO: is updateEndpoint needed? + // throw new UnsupportedOperationException(); + } + + public void entryAdded(EntryEvent event) { + entryAdded(event.getKey(), event.getValue()); } -// public void entryAdded(Object key, Object value) { -// MapEntry entry = (MapEntry)value; -// Endpoint newEp = (Endpoint)entry.getValue(); -// if (!isLocal(entry)) { -// logger.info(id + " Remote endpoint added: " + entry.getValue()); -// newEp.setRemote(true); -// } -// ((RuntimeEndpoint) newEp).bind(registry, this); -// for (EndpointListener listener : listeners) { -// listener.endpointAdded(newEp); -// } -// } -// -// public void entryRemoved(Object key, Object value) { -// MapEntry entry = (MapEntry)value; -// if (!isLocal(entry)) { -// logger.info(id + " Remote endpoint removed: " + entry.getValue()); -// } -// Endpoint oldEp = (Endpoint)entry.getValue(); -// for (EndpointListener listener : listeners) { -// listener.endpointRemoved(oldEp); -// } -// } -// -// public void entryUpdated(Object key, Object oldValue, Object newValue) { -// MapEntry oldEntry = (MapEntry)oldValue; -// MapEntry newEntry = (MapEntry)newValue; -// if (!isLocal(newEntry)) { -// logger.info(id + " Remote endpoint updated: " + newEntry.getValue()); -// } -// Endpoint oldEp = (Endpoint)oldEntry.getValue(); -// Endpoint newEp = (Endpoint)newEntry.getValue(); -// ((RuntimeEndpoint) newEp).bind(registry, this); -// for (EndpointListener listener : listeners) { -// listener.endpointUpdated(oldEp, newEp); -// } -// } + public void entryEvicted(EntryEvent event) { + // Should not happen + } + + public void entryRemoved(EntryEvent event) { + entryRemoved(event.getKey(), event.getValue()); + } + + public void entryUpdated(EntryEvent event) { + entryUpdated(event.getKey(), null, event.getValue()); + } + + public void entryAdded(Object key, Object value) { + Endpoint newEp = (Endpoint)value; + if (!isLocal(newEp)) { + logger.info(" Remote endpoint added: " + newEp); + newEp.setRemote(true); + } + ((RuntimeEndpoint)newEp).bind(registry, this); + for (EndpointListener listener : listeners) { + listener.endpointAdded(newEp); + } + } + + public void entryRemoved(Object key, Object value) { + Endpoint oldEp = (Endpoint)value; + if (!isLocal(oldEp)) { + logger.info(" Remote endpoint removed: " + value); + } + ((RuntimeEndpoint) oldEp).bind(registry, this); + for (EndpointListener listener : listeners) { + listener.endpointRemoved(oldEp); + } + } + + public void entryUpdated(Object key, Object oldValue, Object newValue) { + Endpoint oldEp = (Endpoint)oldValue; + Endpoint newEp = (Endpoint)newValue; + if (!isLocal(newEp)) { + logger.info(" Remote endpoint updated: " + newEp); + } + ((RuntimeEndpoint)newEp).bind(registry, this); + for (EndpointListener listener : listeners) { + listener.endpointUpdated(oldEp, newEp); + } + } + + public void memberAdded(MembershipEvent event) { + } + + public void memberRemoved(MembershipEvent event) { + } } diff --git a/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java new file mode 100644 index 0000000000..32e49dbe41 --- /dev/null +++ b/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.hazelcast; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; + +import org.apache.tuscany.sca.assembly.AssemblyFactory; +import org.apache.tuscany.sca.assembly.Binding; +import org.apache.tuscany.sca.assembly.Component; +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.SCABindingFactory; +import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.runtime.EndpointListener; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import com.hazelcast.core.IMap; + +// Ignore so its not run in the build yet till its working +@Ignore("Hazelcast doesn't support the map entry management by members") +public class MultiRegTestCase implements EndpointListener { + private static ExtensionPointRegistry extensionPoints; + private static AssemblyFactory assemblyFactory; + private static SCABindingFactory scaBindingFactory; + + @BeforeClass + public static void init() { + extensionPoints = new DefaultExtensionPointRegistry(); + FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class); + assemblyFactory = factories.getFactory(AssemblyFactory.class); + scaBindingFactory = factories.getFactory(SCABindingFactory.class); + } + + @Test + public void testReplication() throws Exception { + RuntimeEndpoint ep1 = createEndpoint("ep1uri"); + + String host = InetAddress.getLocalHost().getHostAddress(); + String bind = null; // "9.65.158.31"; + String port1 = "8085"; + String port2 = "8086"; + String port3 = "8087"; + String range = "1"; + + Map attrs1 = new HashMap(); + // attrs1.put("nomcast", "true"); + attrs1.put("bind", bind); + attrs1.put("receiverPort", port1); + attrs1.put("receiverAutoBind", range); + // attrs1.put("routes", host + ":" + port2 + " " + host + ":" + port3); + HazelcastEndpointRegistry reg1 = new HazelcastEndpointRegistry(extensionPoints, attrs1, "tuscany:foo", "bar"); + reg1.addListener(this); + reg1.start(); + + Map attrs2 = new HashMap(); + // attrs2.put("nomcast", "true"); + attrs1.put("bind", bind); + attrs2.put("receiverPort", port2); + attrs2.put("receiverAutoBind", range); + // attrs2.put("routes", host + ":"+port1); + HazelcastEndpointRegistry reg2 = new HazelcastEndpointRegistry(extensionPoints, attrs2, "tuscany:foo", "bar"); + reg2.addListener(this); + reg2.start(); + + Map attrs3 = new HashMap(); + // attrs3.put("nomcast", "true"); + attrs1.put("bind", bind); + attrs3.put("receiverPort", port3); + attrs3.put("receiverAutoBind", range); + // attrs3.put("routes", host + ":"+port1); + HazelcastEndpointRegistry reg3 = new HazelcastEndpointRegistry(extensionPoints, attrs3, "tuscany:foo", "bar"); + reg3.addListener(this); + reg3.start(); + + ep1.bind(extensionPoints, reg1); + reg1.addEndpoint(ep1); + assertExists(reg1, "ep1uri"); + assertExists(reg2, "ep1uri"); + assertExists(reg3, "ep1uri"); + + RuntimeEndpoint ep2 = createEndpoint("ep2uri"); + ep2.bind(extensionPoints, reg2); + reg2.addEndpoint(ep2); + assertExists(reg2, "ep2uri"); + assertExists(reg1, "ep2uri"); + assertExists(reg3, "ep2uri"); + + System.out.println(((IMap)reg1.map).localKeySet().size()); + System.out.println(((IMap)reg2.map).localKeySet().size()); + System.out.println(((IMap)reg3.map).localKeySet().size()); + + reg1.stop(); + Thread.sleep(6000); + Assert.assertNull(reg2.getEndpoint("ep1uri")); + Assert.assertNull(reg3.getEndpoint("ep1uri")); + + System.out.println(((IMap)reg2.map).localKeySet().size()); + System.out.println(((IMap)reg3.map).localKeySet().size()); + + assertExists(reg2, "ep2uri"); + assertExists(reg3, "ep2uri"); + + reg1.start(); + ep1.bind(extensionPoints, reg1); + reg1.addEndpoint(ep1); + assertExists(reg1, "ep1uri"); + assertExists(reg2, "ep1uri"); + assertExists(reg3, "ep1uri"); + + reg1.stop(); + reg2.stop(); + reg3.stop(); + System.out.println(); // closed + } + + private Endpoint assertExists(HazelcastEndpointRegistry reg, String uri) throws InterruptedException { + Endpoint ep = null; + int count = 0; + while (ep == null && count < 15) { + ep = reg.getEndpoint(uri); + if (ep == null) { + Thread.sleep(1000); + System.out.println(reg + ": tries=" + count); + } + count++; + } + Assert.assertNotNull(ep); + Assert.assertEquals(uri, ep.getURI()); + return ep; + } + + private RuntimeEndpoint createEndpoint(String uri) { + RuntimeEndpoint ep = (RuntimeEndpoint) assemblyFactory.createEndpoint(); + Component comp = assemblyFactory.createComponent(); + ep.setComponent(comp); + ep.setService(assemblyFactory.createComponentService()); + Binding b = scaBindingFactory.createSCABinding(); + ep.setBinding(b); + ep.setURI(uri); + return ep; + } + + private void print(String prefix, Endpoint ep) { + System.out.println(prefix + ": "+ep); + } + + public void endpointAdded(Endpoint endpoint) { + print("Added", endpoint); + } + + public void endpointRemoved(Endpoint endpoint) { + print("Removed", endpoint); + } + + public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) { + print("Updated", newEndpoint); + } + +} diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java index 47b3450594..fe683af025 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java @@ -708,7 +708,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb // [rfeng] Change the behavior to replicate to all nodes if (entry.isPrimary() && self.equals(entry.getPrimary())) { try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); entry.setPrimary(self); } catch (ChannelException x) { @@ -768,7 +768,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb if (log.isDebugEnabled()) log.debug("[1] Primary choosing a new backup"); try { - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); entry.setPrimary(channel.getLocalMember(false)); } catch (ChannelException x) { @@ -798,7 +798,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb entry.setPrimary(channel.getLocalMember(false)); entry.setBackup(false); entry.setProxy(false); - Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes()); + Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue()); entry.setBackupNodes(backup); if (mapOwner != null) mapOwner.objectMadePrimay(entry.getKey(), entry.getValue()); @@ -833,7 +833,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb return members[node]; } - protected abstract Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException; + protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException; public void heartbeat() { try { @@ -916,7 +916,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb } if (entry.isBackup()) { //select a new backup node - backup = publishEntryInfo(key, entry.getValue(), entry.getBackupNodes()); + backup = publishEntryInfo(key, entry.getValue()); } else if (entry.isProxy()) { //invalidate the previous primary msg = @@ -997,7 +997,7 @@ public abstract class AbstractReplicatedMap extends MapStore implements RpcCallb old = remove(key); try { if (notify) { - Member[] backup = publishEntryInfo(key, value, entry.getBackupNodes()); + Member[] backup = publishEntryInfo(key, value); entry.setBackupNodes(backup); } } catch (ChannelException x) { diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java index 0c2144ea49..42d2eda6a3 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java @@ -38,11 +38,13 @@ import java.util.logging.Logger; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.GroupChannel; import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor; import org.apache.catalina.tribes.membership.McastService; import org.apache.catalina.tribes.membership.StaticMember; +import org.apache.catalina.tribes.transport.ReceiverBase; import org.apache.tuscany.sca.assembly.Endpoint; import org.apache.tuscany.sca.assembly.EndpointReference; import org.apache.tuscany.sca.core.ExtensionPointRegistry; @@ -67,6 +69,9 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi private String address = MULTICAST_ADDRESS; private String bind = null; private int timeout = 50; + private int receiverPort = 4000; + private int receiverAutoBind = 100; + private List staticRoutes; private final static String DEFAULT_DOMAIN_URI = "http://tuscany.apache.org/sca/1.1/domains/default"; private String domainURI = DEFAULT_DOMAIN_URI; @@ -75,7 +80,6 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi private ExtensionPointRegistry registry; private ReplicatedMap map; - private static List staticRoutes; private String id; private boolean noMultiCast; @@ -175,6 +179,14 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi if (mcast != null) { noMultiCast = Boolean.valueOf(mcast); } + String recvPort = attributes.get("receiverPort"); + if (recvPort != null) { + receiverPort = Integer.parseInt(recvPort); + } + String recvAutoBind = attributes.get("receiverAutoBind"); + if (recvAutoBind != null) { + receiverAutoBind = Integer.parseInt(recvAutoBind); + } } public void start() { @@ -190,9 +202,24 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi if (noMultiCast) { map.getChannel().addInterceptor(new DisableMcastInterceptor()); } - - // http://www.mail-archive.com/users@tomcat.apache.org/msg24873.html - int port = channel.getChannelReceiver().getPort(); + + // Configure the receiver ports + ChannelReceiver receiver = channel.getChannelReceiver(); + if (receiver instanceof ReceiverBase) { + ((ReceiverBase)receiver).setAutoBind(receiverAutoBind); + ((ReceiverBase)receiver).setPort(receiverPort); + } + + /* + Object sender = channel.getChannelSender(); + if (sender instanceof ReplicationTransmitter) { + sender = ((ReplicationTransmitter)sender).getTransport(); + } + if (sender instanceof AbstractSender) { + ((AbstractSender)sender).setKeepAliveCount(0); + ((AbstractSender)sender).setMaxRetryAttempts(5); + } + */ if (staticRoutes != null) { StaticMembershipInterceptor smi = new StaticMembershipInterceptor(); @@ -200,12 +227,12 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi Member member; try { // The port has to match the receiver port - member = new StaticMember(staticRoute.getHost(), port, 5000); + member = new StaticMember(staticRoute.getHost(), staticRoute.getPort(), 5000); } catch (IOException e) { throw new RuntimeException(e); } smi.addStaticMember(member); - logger.info("Added static route: " + staticRoute.getHost() + ":" + port); + logger.info("Added static route: " + staticRoute.getHost() + ":" + staticRoute.getPort()); } smi.setLocalMember(map.getChannel().getLocalMember(false)); map.getChannel().addInterceptor(smi); @@ -410,6 +437,7 @@ public class ReplicatedEndpointRegistry implements EndpointRegistry, LifeCycleLi logger.info(id + " Remote endpoint removed: " + entry.getValue()); } Endpoint oldEp = (Endpoint)entry.getValue(); + ((RuntimeEndpoint) oldEp).bind(registry, this); for (EndpointListener listener : listeners) { listener.endpointRemoved(oldEp); } diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java index 762407604d..669ad82192 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java @@ -17,6 +17,8 @@ package org.apache.tuscany.sca.endpoint.tribes; import java.io.Serializable; +import java.util.Iterator; +import java.util.Map; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; @@ -97,29 +99,65 @@ public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, * @return Member - the backup node * @throws ChannelException */ - protected Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException { + protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException { if (!(key instanceof Serializable && value instanceof Serializable)) return new Member[0]; //select a backup node - Member[] backup = getMapMembers(); + Member[] members = getMapMembers(); - if (backup == null || backup.length == 0) - return null; - - // Set the receivers to these members that are not in the backup nodes yet - Member[] members = backup; - if (backupNodes != null) { - members = getMapMembersExcl(backupNodes); + if (members == null || members.length == 0) { + return new Member[0]; } //publish the data out to all nodes MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value, - null, channel.getLocalMember(false), backup); + null, channel.getLocalMember(false), members); getChannel().send(members, msg, getChannelSendOptions()); - return backup; + return members; + } + + /** + * Override the base method to look up existing entries only + */ + public Object get(Object key) { + MapEntry entry = super.getInternal(key); + if (log.isTraceEnabled()) + log.trace("Requesting id:" + key + " entry:" + entry); + if (entry == null) { + return null; + } + return entry.getValue(); } + /** + * Override the base method to remove all entries owned by the member that disappeared + */ + public void memberDisappeared(Member member) { + boolean removed = false; + synchronized (mapMembers) { + removed = (mapMembers.remove(member) != null); + if (!removed) { + if (log.isDebugEnabled()) + log.debug("Member[" + member + "] disappeared, but was not present in the map."); + return; //the member was not part of our map. + } + } + + Iterator> i = super.entrySetFull().iterator(); + while (i.hasNext()) { + Map.Entry e = i.next(); + MapEntry entry = (MapEntry)super.getInternal(e.getKey()); + if (entry == null) { + continue; + } + if (member.equals(entry.getPrimary())) { + if (log.isDebugEnabled()) + log.debug("[2] Primary disappeared"); + i.remove(); + } //end if + } //while + } } diff --git a/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java b/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java index d329ebd066..a470c47ba0 100644 --- a/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java +++ b/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java @@ -19,6 +19,7 @@ package org.apache.tuscany.sca.endpoint.tribes; +import java.net.InetAddress; import java.util.HashMap; import java.util.Map; @@ -28,109 +29,141 @@ import org.apache.tuscany.sca.assembly.Component; import org.apache.tuscany.sca.assembly.Endpoint; import org.apache.tuscany.sca.assembly.SCABindingFactory; import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.runtime.EndpointListener; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; import org.junit.Assert; -import org.junit.Ignore; +import org.junit.BeforeClass; import org.junit.Test; // Ignore so its not run in the build yet till its working -@Ignore -public class MultiRegTestCase { - -// @Test -// public void testTwoNodesMultiCast() throws InterruptedException { -// DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry(); -// FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class); -// AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class); -// -// ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar"); -// reg1.start(); -// -// Endpoint ep1 = assemblyFactory.createEndpoint(); -// Component comp = assemblyFactory.createComponent(); -// ep1.setComponent(comp); -// ep1.setService(assemblyFactory.createComponentService()); -// Binding b = new SCABindingFactoryImpl().createSCABinding(); -// ep1.setBinding(b); -// ep1.setURI("ep1uri"); -// reg1.addEndpoint(ep1); -// -// Endpoint ep1p = reg1.getEndpoint("ep1uri"); -// Assert.assertNotNull(ep1p); -// Assert.assertEquals("ep1uri", ep1p.getURI()); -// -// ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar"); -// reg2.start(); -// Thread.sleep(5000); -// -// Endpoint ep1p2 = reg2.getEndpoint("ep1uri"); -// Assert.assertNotNull(ep1p2); -// Assert.assertEquals("ep1uri", ep1p2.getURI()); -// -// reg1.stop(); -// reg2.stop(); -// } +public class MultiRegTestCase implements EndpointListener { + private static ExtensionPointRegistry extensionPoints; + private static AssemblyFactory assemblyFactory; + private static SCABindingFactory scaBindingFactory; - @Test - public void testTwoNodesStaticNoMultiCast() throws InterruptedException { - DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry(); + @BeforeClass + public static void init() { + extensionPoints = new DefaultExtensionPointRegistry(); FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class); - AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class); + assemblyFactory = factories.getFactory(AssemblyFactory.class); + scaBindingFactory = factories.getFactory(SCABindingFactory.class); + } + + @Test + public void testReplication() throws Exception { + RuntimeEndpoint ep1 = createEndpoint("ep1uri"); + + String host = InetAddress.getLocalHost().getHostAddress(); + String bind = null; // "9.65.158.31"; + String port1 = "8085"; + String port2 = "8086"; + String port3 = "8087"; + String range = "1"; Map attrs1 = new HashMap(); - attrs1.put("nomcast", "true"); - attrs1.put("routes", "9.167.197.91:4001 9.167.197.91:4002"); + // attrs1.put("nomcast", "true"); + attrs1.put("bind", bind); + attrs1.put("receiverPort", port1); + attrs1.put("receiverAutoBind", range); + // attrs1.put("routes", host + ":" + port2 + " " + host + ":" + port3); ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, attrs1, "foo", "bar"); + reg1.addListener(this); reg1.start(); - Endpoint ep1 = assemblyFactory.createEndpoint(); - Component comp = assemblyFactory.createComponent(); - ep1.setComponent(comp); - ep1.setService(assemblyFactory.createComponentService()); - Binding b = factories.getFactory(SCABindingFactory.class).createSCABinding(); - ep1.setBinding(b); - ep1.setURI("ep1uri"); - reg1.addEndpoint(ep1); - - Endpoint ep1p = reg1.getEndpoint("ep1uri"); - Assert.assertNotNull(ep1p); - Assert.assertEquals("ep1uri", ep1p.getURI()); - Map attrs2 = new HashMap(); - attrs2.put("nomcast", "true"); - attrs2.put("routes", "9.167.197.91:4000"); + // attrs2.put("nomcast", "true"); + attrs1.put("bind", bind); + attrs2.put("receiverPort", port2); + attrs2.put("receiverAutoBind", range); + // attrs2.put("routes", host + ":"+port1); ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, attrs2, "foo", "bar"); + reg2.addListener(this); reg2.start(); - - System.out.println("wait"); - Thread.sleep(10000); - System.out.println("run"); - - Endpoint ep1p2 = reg2.getEndpoint("ep1uri"); - Assert.assertNotNull(ep1p2); - Assert.assertEquals("ep1uri", ep1p2.getURI()); Map attrs3 = new HashMap(); - attrs3.put("nomcast", "true"); - attrs3.put("routes", "9.167.197.91:4000"); + // attrs3.put("nomcast", "true"); + attrs1.put("bind", bind); + attrs3.put("receiverPort", port3); + attrs3.put("receiverAutoBind", range); + // attrs3.put("routes", host + ":"+port1); ReplicatedEndpointRegistry reg3 = new ReplicatedEndpointRegistry(extensionPoints, attrs3, "foo", "bar"); + reg3.addListener(this); reg3.start(); - - System.out.println("wait"); - Thread.sleep(5000); - System.out.println("run"); - Endpoint ep1p3 = reg3.getEndpoint("ep1uri"); - Assert.assertNotNull(ep1p3); - Assert.assertEquals("ep1uri", ep1p3.getURI()); + ep1.bind(extensionPoints, reg1); + reg1.addEndpoint(ep1); + assertExists(reg1, "ep1uri"); + assertExists(reg2, "ep1uri"); + assertExists(reg3, "ep1uri"); + + RuntimeEndpoint ep2 = createEndpoint("ep2uri"); + ep2.bind(extensionPoints, reg2); + reg2.addEndpoint(ep2); + assertExists(reg2, "ep2uri"); + assertExists(reg1, "ep2uri"); + assertExists(reg3, "ep2uri"); + reg1.stop(); + Thread.sleep(6000); + Assert.assertNull(reg2.getEndpoint("ep1uri")); + Assert.assertNull(reg3.getEndpoint("ep1uri")); + assertExists(reg2, "ep2uri"); + assertExists(reg3, "ep2uri"); + + reg1.start(); + ep1.bind(extensionPoints, reg1); + reg1.addEndpoint(ep1); + assertExists(reg1, "ep1uri"); + assertExists(reg2, "ep1uri"); + assertExists(reg3, "ep1uri"); - System.out.println("wait2"); - Thread.sleep(5000); - System.out.println("end"); reg1.stop(); reg2.stop(); reg3.stop(); + System.out.println(); // closed + } + + private Endpoint assertExists(ReplicatedEndpointRegistry reg, String uri) throws InterruptedException { + Endpoint ep = null; + int count = 0; + while (ep == null && count < 15) { + ep = reg.getEndpoint(uri); + Thread.sleep(1000); + count++; + System.out.println(reg + ": tries=" + count); + } + Assert.assertNotNull(ep); + Assert.assertEquals(uri, ep.getURI()); + return ep; + } + + private RuntimeEndpoint createEndpoint(String uri) { + RuntimeEndpoint ep = (RuntimeEndpoint) assemblyFactory.createEndpoint(); + Component comp = assemblyFactory.createComponent(); + ep.setComponent(comp); + ep.setService(assemblyFactory.createComponentService()); + Binding b = scaBindingFactory.createSCABinding(); + ep.setBinding(b); + ep.setURI(uri); + return ep; + } + + private void print(String prefix, Endpoint ep) { + System.out.println(prefix + ": "+ep); + } + + public void endpointAdded(Endpoint endpoint) { + print("Added", endpoint); + } + + public void endpointRemoved(Endpoint endpoint) { + print("Removed", endpoint); + } + + public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) { + print("Updated", newEndpoint); } } diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java index 949d2d8af7..8ecc5f7ea8 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java @@ -69,6 +69,30 @@ public class EndpointHelper { if (serviceID != null) { props.put(RemoteConstants.ENDPOINT_SERVICE_ID, Long.parseLong(serviceID)); } + + // FIXME: [rfeng] We need to calculate the intents supported by this endpoint + /* + QName bindingTypeName = endpoint.getBinding().getType(); + Definitions definitions = null; + if(definitions!=null) { + for(BindingType bindingType: definitions.getBindingTypes()) { + if(bindingType.getType().equals(bindingTypeName)) { + bindingType.getAlwaysProvidedIntents(); + } + } + */ + + String intents = (String)props.get(RemoteConstants.SERVICE_EXPORTED_INTENTS); + String extraIntents = (String)props.get(RemoteConstants.SERVICE_EXPORTED_INTENTS_EXTRA); + if (intents == null) { + intents = ""; + } + if (extraIntents != null) { + intents = intents + " " + extraIntents; + } + + props.put(RemoteConstants.SERVICE_INTENTS, intents.trim()); + props.put(RemoteConstants.ENDPOINT_ID, endpoint.getURI()); // FIXME: [rfeng] How to pass in the remote service id from the endpoint XML props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, new String[] {"org.osgi.sca"}); diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java index 608c74bcfc..f4521cfbfc 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java @@ -27,6 +27,12 @@ import java.util.Hashtable; import java.util.List; import java.util.Map; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.definitions.Definitions; +import org.apache.tuscany.sca.deployment.Deployer; +import org.apache.tuscany.sca.policy.BindingType; +import org.apache.tuscany.sca.policy.Intent; import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; import org.osgi.framework.Constants; @@ -73,10 +79,28 @@ public class RemoteServiceAdminImpl implements RemoteServiceAdmin, ManagedServic importer.start(); Hashtable props = new Hashtable(); props.put(RemoteConstants.REMOTE_CONFIGS_SUPPORTED, new String[] {"org.osgi.sca"}); + + ExtensionPointRegistry registry = exporter.getExtensionPointRegistry(); + UtilityExtensionPoint utilities = registry.getExtensionPoint(UtilityExtensionPoint.class); + Deployer deployer = utilities.getUtility(Deployer.class); + Definitions definitions = deployer.getSystemDefinitions(); + + String[] intents = new String[definitions.getIntents().size()]; + int i = 0; + for (Intent intent : definitions.getIntents()) { + intents[i++] = intent.toString(); + } + + String[] bindingTypes = new String[definitions.getBindingTypes().size()]; + i = 0; + for (BindingType bindingType : definitions.getBindingTypes()) { + bindingTypes[i++] = bindingType.getType().toString(); + } + // FIXME: We should ask SCA domain for the supported intents - props.put(RemoteConstants.REMOTE_INTENTS_SUPPORTED, new String[] {}); + props.put(RemoteConstants.REMOTE_INTENTS_SUPPORTED, intents); // FIXME: We should ask SCA domain for the supported binding types - props.put("org.osgi.sca.binding.types", new String[] {}); + props.put("org.osgi.sca.binding.types", bindingTypes); registration = context.registerService(RemoteServiceAdmin.class.getName(), this, props); props = new Hashtable(); diff --git a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java index f6e9855556..a7c6d04ee9 100644 --- a/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java +++ b/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java @@ -53,6 +53,19 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements this.domainRegistryFactory = registry.getExtensionPoint(UtilityExtensionPoint.class).getUtility(DomainRegistryFactory.class); domainRegistryFactory.addListener(this); + + // [rfeng] Starting of the endpoint registry takes a long time and it leaves the bundle + // state to be starting. When the registry is started, remote endpoints come in and that + // triggers the classloading from this bundle. + Thread thread = new Thread() { + public void run() { + startEndpointRegistry(); + } + }; + thread.start(); + } + + private void startEndpointRegistry() { // The following code forced the start() of the domain registry in absense of services String domainRegistry = context.getProperty("org.osgi.sca.domain.registry"); if (domainRegistry == null) { @@ -103,6 +116,8 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements { // Notify the endpoint listeners EndpointDescription description = createEndpointDescription(bundleContext, endpoint); + // Set the owning bundle to runtime bundle to avoid NPE + servicesInfo.put(description, context.getBundle()); endpointChanged(description, ADDED); } } @@ -115,6 +130,7 @@ public class DomainDiscoveryService extends AbstractDiscoveryService implements */ { EndpointDescription description = createEndpointDescription(context, endpoint); + servicesInfo.remove(description); endpointChanged(description, REMOVED); } } -- cgit v1.2.3