From 767a139887e1fa949c50bbbe263c72d2b5080748 Mon Sep 17 00:00:00 2001 From: antelder Date: Mon, 16 May 2011 07:31:16 +0000 Subject: Rename hazelcast registry packages to match the new DomainRegistry naming git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1103632 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/binding/hazelcast/ReferenceInvoker.java | 2 +- .../hazelcast/HazelcastDomainRegistry.java | 642 --------------------- .../hazelcast/HazelcastDomainRegistryFactory.java | 49 -- .../sca/endpoint/hazelcast/RegistryConfig.java | 177 ------ .../HazelcastClientDomainRegistryFactory.java | 49 -- .../client/HazelcastClientEndpointRegistry.java | 130 ----- .../hazelcast/HazelcastDomainRegistry.java | 642 +++++++++++++++++++++ .../hazelcast/HazelcastDomainRegistryFactory.java | 49 ++ .../sca/registry/hazelcast/RegistryConfig.java | 177 ++++++ .../HazelcastClientDomainRegistryFactory.java | 49 ++ .../client/HazelcastClientEndpointRegistry.java | 130 +++++ ...pache.tuscany.sca.runtime.DomainRegistryFactory | 4 +- .../sca/endpoint/hazelcast/MultiRegTestCase.java | 217 ------- .../sca/endpoint/hazelcast/RegistryTestCase.java | 142 ----- .../sca/registry/hazelcast/MultiRegTestCase.java | 218 +++++++ .../sca/registry/hazelcast/RegistryTestCase.java | 142 +++++ 16 files changed, 1410 insertions(+), 1409 deletions(-) delete mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastDomainRegistry.java delete mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastDomainRegistryFactory.java delete mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryConfig.java delete mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/client/HazelcastClientDomainRegistryFactory.java delete mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/client/HazelcastClientEndpointRegistry.java create mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/registry/hazelcast/HazelcastDomainRegistry.java create mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/registry/hazelcast/HazelcastDomainRegistryFactory.java create mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/registry/hazelcast/RegistryConfig.java create mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/regsitry/hazelcast/client/HazelcastClientDomainRegistryFactory.java create mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/regsitry/hazelcast/client/HazelcastClientEndpointRegistry.java delete mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java delete mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryTestCase.java create mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/registry/hazelcast/MultiRegTestCase.java create mode 100644 sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/registry/hazelcast/RegistryTestCase.java diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/binding/hazelcast/ReferenceInvoker.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/binding/hazelcast/ReferenceInvoker.java index 9bf5f4c21a..1585684691 100644 --- a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/binding/hazelcast/ReferenceInvoker.java +++ b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/binding/hazelcast/ReferenceInvoker.java @@ -29,12 +29,12 @@ import javax.xml.namespace.QName; import org.apache.tuscany.sca.common.xml.dom.DOMHelper; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.FactoryExtensionPoint; -import org.apache.tuscany.sca.endpoint.hazelcast.HazelcastDomainRegistry; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.interfacedef.util.FaultException; import org.apache.tuscany.sca.invocation.Invoker; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.registry.hazelcast.HazelcastDomainRegistry; import org.apache.tuscany.sca.runtime.DomainRegistryFactory; import org.apache.tuscany.sca.runtime.DomainRegistry; import org.apache.tuscany.sca.runtime.ExtensibleDomainRegistryFactory; diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastDomainRegistry.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastDomainRegistry.java deleted file mode 100644 index 7d65a5fe57..0000000000 --- a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastDomainRegistry.java +++ /dev/null @@ -1,642 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.hazelcast; - -import java.io.ByteArrayOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.StringReader; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.wsdl.Definition; -import javax.wsdl.WSDLException; -import javax.wsdl.xml.WSDLReader; -import javax.wsdl.xml.WSDLWriter; -import javax.xml.namespace.QName; -import javax.xml.stream.XMLStreamException; -import javax.xml.stream.XMLStreamReader; - -import org.apache.tuscany.sca.assembly.AssemblyFactory; -import org.apache.tuscany.sca.assembly.Composite; -import org.apache.tuscany.sca.assembly.Endpoint; -import org.apache.tuscany.sca.common.xml.stax.StAXHelper; -import org.apache.tuscany.sca.contribution.processor.ContributionReadException; -import org.apache.tuscany.sca.contribution.processor.ContributionWriteException; -import org.apache.tuscany.sca.contribution.processor.ExtensibleStAXArtifactProcessor; -import org.apache.tuscany.sca.contribution.processor.ProcessorContext; -import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint; -import org.apache.tuscany.sca.core.ExtensionPointRegistry; -import org.apache.tuscany.sca.core.FactoryExtensionPoint; -import org.apache.tuscany.sca.core.LifeCycleListener; -import org.apache.tuscany.sca.core.UtilityExtensionPoint; -import org.apache.tuscany.sca.interfacedef.InterfaceContract; -import org.apache.tuscany.sca.interfacedef.wsdl.WSDLDefinition; -import org.apache.tuscany.sca.interfacedef.wsdl.WSDLFactory; -import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterface; -import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterfaceContract; -import org.apache.tuscany.sca.runtime.BaseDomainRegistry; -import org.apache.tuscany.sca.runtime.ContributionListener; -import org.apache.tuscany.sca.runtime.DomainRegistry; -import org.apache.tuscany.sca.runtime.InstalledContribution; -import org.apache.tuscany.sca.runtime.RuntimeEndpoint; -import org.apache.tuscany.sca.runtime.RuntimeProperties; -import org.oasisopen.sca.ServiceRuntimeException; -import org.xml.sax.InputSource; - -import com.hazelcast.config.Config; -import com.hazelcast.config.NearCacheConfig; -import com.hazelcast.config.TcpIpConfig; -import com.hazelcast.config.XmlConfigBuilder; -import com.hazelcast.core.EntryEvent; -import com.hazelcast.core.EntryListener; -import com.hazelcast.core.Hazelcast; -import com.hazelcast.core.HazelcastInstance; -import com.hazelcast.core.ILock; -import com.hazelcast.core.IMap; -import com.hazelcast.core.Member; -import com.hazelcast.core.MembershipEvent; -import com.hazelcast.core.MembershipListener; -import com.hazelcast.core.MultiMap; -import com.hazelcast.core.Transaction; -import com.hazelcast.nio.Address; - -/** - * An DomainRegistry using a Hazelcast - */ -public class HazelcastDomainRegistry extends BaseDomainRegistry implements DomainRegistry, LifeCycleListener, EntryListener, MembershipListener { - private final static Logger logger = Logger.getLogger(HazelcastDomainRegistry.class.getName()); - - private HazelcastInstance hazelcastInstance; - - protected Map endpointMap; - protected MultiMap endpointOwners; - protected Map> runningComposites; - protected Map>> runningCompositeOwners; - - protected Map endpointWsdls; - protected Map localEndpoints = new ConcurrentHashMap(); - - protected Map installedContributions; - - protected AssemblyFactory assemblyFactory; - protected Object shutdownMutex = new Object(); - protected Properties properties; - - public HazelcastDomainRegistry(ExtensionPointRegistry registry, Properties properties, String endpointRegistryURI, String domainURI) { - super(registry, null, endpointRegistryURI, domainURI); - this.assemblyFactory = registry.getExtensionPoint(FactoryExtensionPoint.class).getFactory(AssemblyFactory.class); - this.properties = properties; - } - - public HazelcastDomainRegistry(ExtensionPointRegistry registry, - Map attributes, - String domainRegistryURI, - String domainURI) { - super(registry, attributes, domainRegistryURI, domainURI); - this.assemblyFactory = registry.getExtensionPoint(FactoryExtensionPoint.class).getFactory(AssemblyFactory.class); - this.properties = registry.getExtensionPoint(UtilityExtensionPoint.class).getUtility(RuntimeProperties.class).getProperties(); - } - - public HazelcastInstance getHazelcastInstance() { - return hazelcastInstance; - } - - public void start() { - if (endpointMap != null) { - throw new IllegalStateException("The registry has already been started"); - } -// if (configURI.toString().startsWith("tuscany:vm:")) { -// endpointMap = new HashMap(); -// } else { - initHazelcastInstance(); - IMap imap = hazelcastInstance.getMap(domainURI + "/Endpoints"); - imap.addEntryListener(this, true); - endpointMap = imap; - - endpointOwners = hazelcastInstance.getMultiMap(domainURI + "/EndpointOwners"); - endpointWsdls = hazelcastInstance.getMap(domainURI + "/EndpointWsdls"); - - runningComposites = hazelcastInstance.getMap(domainURI + "/RunningComposites"); - runningCompositeOwners = hazelcastInstance.getMap(domainURI + "/RunningCompositeOwners"); - - installedContributions = hazelcastInstance.getMap(domainURI + "/InstalledContributions"); - ((IMap)installedContributions).addEntryListener(new EntryListener() { - public void entryAdded(EntryEvent event) { - } - public void entryRemoved(EntryEvent event) { - for (ContributionListener listener : contributionlisteners) { - listener.contributionRemoved(event.getKey()); - } - } - public void entryUpdated(EntryEvent event) { - for (ContributionListener listener : contributionlisteners) { - listener.contributionUpdated(event.getKey()); - } - } - public void entryEvicted(EntryEvent event) { - } - }, false); - - hazelcastInstance.getCluster().addMembershipListener(this); -// } - } - - public void stop() { - if (hazelcastInstance != null) { - synchronized (shutdownMutex) { - hazelcastInstance.shutdown(); - hazelcastInstance = null; - endpointMap = null; - endpointOwners = null; - endpointWsdls = null; - runningComposites = null; - runningCompositeOwners = null; - } - } - } - - private void initHazelcastInstance() { - - // Hazelcast is outputs a lot on info level log messages which are unnecessary for us, - // so disable info logging for hazelcast client classes unless fine logging is on for tuscany. - if (!logger.isLoggable(Level.CONFIG)) { - Logger hzl = Logger.getLogger("com.hazelcast"); - if (!hzl.isLoggable(Level.FINE)) { - hzl.setLevel(Level.WARNING); - // we want the ClusterManager info messages so we can see nodes come and go - Logger.getLogger("com.hazelcast.cluster.ClusterManager").setLevel(Level.INFO); - // we don't want any of the XmlConfigBuilder warnings as set the config programatically - Logger.getLogger("com.hazelcast.config.XmlConfigBuilder").setLevel(Level.SEVERE); - } - } - - Config config = getHazelcastConfig(); - - // do this when theres a way to have adders be the key owners - // config.getMapConfig(configURI.getDomainName() + "/Endpoints").setBackupCount(0); - - // this caches reads locally - config.getMapConfig("default").setNearCacheConfig(new NearCacheConfig(0, 0, "NONE", 0, true)); - - // Disable the Hazelcast shutdown hook as Tuscany has its own and with both there are race conditions - config.setProperty("hazelcast.shutdownhook.enabled", - // GroupProperties.PROP_SHUTDOWNHOOK_ENABLED, - "false"); - - // By default this is 5 seconds, not sure what the implications are but dropping it down to 1 makes - // things like the samples look much faster - config.setProperty("hazelcast.wait.seconds.before.join", - // GroupProperties.PROP_WAIT_SECONDS_BEFORE_JOIN, - "1"); - - this.hazelcastInstance = Hazelcast.newHazelcastInstance(config); - if (logger.isLoggable(Level.INFO)) { - logger.info("started node in domain '" + domainURI + "' + at: " + hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress()); - } - } - - protected Config getHazelcastConfig() { - Config config; - this.properties = registry.getExtensionPoint(UtilityExtensionPoint.class).getUtility(RuntimeProperties.class).getProperties(); - String configFile = properties.getProperty("hazelcastConfig"); - if (configFile != null) { - try { - config = new XmlConfigBuilder(configFile).build(); - } catch (FileNotFoundException e) { - throw new IllegalArgumentException(configFile, e); - } - } else { - // TUSCANY-3675 - domainRegistryURI properties don't seem to be copied into the - // properties collection anywhere - config = new XmlConfigBuilder().build(); - RegistryConfig rc = RegistryConfig.parseConfigURI(domainRegistryURI); - config.setPort(rc.getBindPort()); - //config.setPortAutoIncrement(false); - - if (!rc.getBindAddress().equals("*")) { - config.getNetworkConfig().getInterfaces().setEnabled(true); - config.getNetworkConfig().getInterfaces().clear(); - config.getNetworkConfig().getInterfaces().addInterface(rc.getBindAddress()); - } - - config.getGroupConfig().setName(rc.getUserid()); - config.getGroupConfig().setPassword(rc.getPassword()); - - if (rc.isMulticastDisabled()) { - config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false); - } else { - config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(true); - config.getNetworkConfig().getJoin().getMulticastConfig().setMulticastPort(rc.getMulticastPort()); - config.getNetworkConfig().getJoin().getMulticastConfig().setMulticastGroup(rc.getMulticastAddress()); - } - - if (rc.getWKAs().size() > 0) { - TcpIpConfig tcpconfig = config.getNetworkConfig().getJoin().getTcpIpConfig(); - tcpconfig.setEnabled(true); - List
lsMembers = tcpconfig.getAddresses(); - lsMembers.clear(); - for (String addr : rc.getWKAs()) { - String[] ipNPort = addr.split(":"); - try { - lsMembers.add(new Address(ipNPort[0], Integer.parseInt(ipNPort[1]))); - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } - } - } - } - return config; - } - - public void addEndpoint(Endpoint endpoint) { - if (findEndpoint(endpoint.getURI()).size() > 0) { - Member m = getOwningMember(endpoint.getURI()); - throw new IllegalStateException("Endpoint " + endpoint.getURI() + " already exists in domain " + domainURI + " at " + m.getInetSocketAddress()); - } - - String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString(); - String endpointURI = endpoint.getURI(); - String wsdl = getWsdl(endpoint); - Transaction txn = hazelcastInstance.getTransaction(); - txn.begin(); - try { - localEndpoints.put(endpointURI, endpoint); - endpointMap.put(endpointURI, endpoint); - endpointWsdls.put(endpointURI, wsdl); - endpointOwners.put(localMemberAddr, endpointURI); - txn.commit(); - } catch (Throwable e) { - txn.rollback(); - throw new ServiceRuntimeException(e); - } - logger.info("Add endpoint - " + endpoint); - } - - private String getWsdl(Endpoint endpoint) { - WSDLInterfaceContract wsdlIC = (WSDLInterfaceContract)((RuntimeEndpoint)endpoint).getGeneratedWSDLContract(endpoint.getComponentServiceInterfaceContract()); - if (wsdlIC == null) { - return ""; - } - WSDLInterface wsdl = (WSDLInterface)wsdlIC.getInterface(); - WSDLDefinition d = wsdl.getWsdlDefinition(); - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - try { - WSDLWriter writer = javax.wsdl.factory.WSDLFactory.newInstance().newWSDLWriter(); - writer.writeWSDL(d.getDefinition(), outStream); - } catch (Exception e){ - throw new RuntimeException(e); - } - return outStream.toString(); - } - - public List findEndpoint(String uri) { - List foundEndpoints = new ArrayList(); - for (Object v : endpointMap.values()) { - Endpoint endpoint = (Endpoint)v; - logger.fine("Matching against - " + endpoint); - if (endpoint.matches(uri)) { - endpoint = localizeEndpoint(endpoint); - foundEndpoints.add(endpoint); - logger.fine("Found endpoint with matching service - " + endpoint); - } - } - return foundEndpoints; - } - - private Endpoint localizeEndpoint(Endpoint endpoint) { - if (endpoint == null) return null; - if (!isLocal(endpoint)) { - endpoint.setRemote(true); - ((RuntimeEndpoint)endpoint).bind(registry, this); - try { - setNormailizedWSDLContract(endpoint); - } catch (WSDLException e) { - throw new RuntimeException(e); - } - } else { - // get the local version of the endpoint - // this local version won't have been serialized - // won't be marked as remote and will have the - // full interface contract information - endpoint = localEndpoints.get(endpoint.getURI()); - } - return endpoint; - } - - - private void setNormailizedWSDLContract(Endpoint endpoint) throws WSDLException { - String wsdl = endpointWsdls == null ? null : (String)endpointWsdls.get(endpoint.getURI()); - if (wsdl == null || wsdl.length() < 1) { - return; - } - InterfaceContract ic = endpoint.getComponentServiceInterfaceContract(); - WSDLFactory wsdlFactory = registry.getExtensionPoint(FactoryExtensionPoint.class).getFactory(WSDLFactory.class); - WSDLInterfaceContract wsdlIC = wsdlFactory.createWSDLInterfaceContract(); - WSDLInterface wsdlIface = wsdlFactory.createWSDLInterface(); - WSDLDefinition wsdlDef = wsdlFactory.createWSDLDefinition(); - WSDLReader reader = javax.wsdl.factory.WSDLFactory.newInstance().newWSDLReader(); - InputSource inputSource = new InputSource(new StringReader(wsdl)); - Definition def = reader.readWSDL("", inputSource); - wsdlDef.setDefinition(def); - wsdlIface.setWsdlDefinition(wsdlDef); - wsdlIC.setInterface(wsdlIface); - ic.setNormailizedWSDLContract(wsdlIC); - } - - private boolean isLocal(Endpoint endpoint) { - return localEndpoints.containsKey(endpoint.getURI()); - } - - public Endpoint getEndpoint(String uri) { - return localizeEndpoint((Endpoint)endpointMap.get(uri)); - } - - public List getEndpoints() { - ArrayList eps = new ArrayList(); - for (Object ep : endpointMap.values()) { - eps.add(localizeEndpoint((Endpoint)ep)); - } - return eps; - } - - public void removeEndpoint(Endpoint endpoint) { - if (hazelcastInstance == null) { - return; - } - synchronized (shutdownMutex) { - String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString(); - String endpointURI = endpoint.getURI(); - - Transaction txn = hazelcastInstance.getTransaction(); - txn.begin(); - try { - endpointOwners.remove(localMemberAddr, endpointURI); - endpointMap.remove(endpointURI); - endpointWsdls.remove(endpointURI); - txn.commit(); - } catch (Throwable e) { - txn.rollback(); - throw new ServiceRuntimeException(e); - } - localEndpoints.remove(endpointURI); - logger.info("Removed endpoint - " + endpoint); - } - } - - - public void entryAdded(EntryEvent event) { - entryAdded(event.getKey(), event.getValue()); - } - - public void entryEvicted(EntryEvent event) { - // Should not happen - } - - public void entryRemoved(EntryEvent event) { - entryRemoved(event.getKey(), event.getValue()); - } - - public void entryUpdated(EntryEvent event) { - entryUpdated(event.getKey(), null, event.getValue()); - } - - public void entryAdded(Object key, Object value) { - Endpoint newEp = (Endpoint)value; - if (!isLocal(newEp)) { - logger.info(" Remote endpoint added: " + newEp); - } - endpointAdded(newEp); - } - - public void entryRemoved(Object key, Object value) { - Endpoint oldEp = (Endpoint)value; - if (!isLocal(oldEp)) { - logger.info(" Remote endpoint removed: " + value); - } - endpointRemoved(oldEp); - } - - public void entryUpdated(Object key, Object oldValue, Object newValue) { - Endpoint oldEp = (Endpoint)oldValue; - Endpoint newEp = (Endpoint)newValue; - if (!isLocal(newEp)) { - logger.info(" Remote endpoint updated: " + newEp); - } - endpointUpdated(oldEp, newEp); - } - - public void memberAdded(MembershipEvent event) { - } - - public void memberRemoved(MembershipEvent event) { - try { - String memberAddr = event.getMember().getInetSocketAddress().toString(); - if (endpointOwners.containsKey(memberAddr)) { - synchronized (shutdownMutex) { - ILock lock = hazelcastInstance.getLock("EndpointOwners/" + memberAddr); - lock.lock(); - try { - if (endpointOwners.containsKey(memberAddr)) { - Collection keys = endpointOwners.remove(memberAddr); - for (Object k : keys) { - endpointMap.remove(k); - endpointWsdls.remove(k); - } - } - if (runningCompositeOwners.containsKey(memberAddr)) { - Map> cs = runningCompositeOwners.remove(memberAddr); - for (String curi : cs.keySet()) { - Map rcs = runningComposites.get(curi); - for (QName qn : cs.get(curi)) { - rcs.remove(qn); - } - } - } - } finally { - lock.unlock(); - } - } - } - } catch (Exception e) { - if (e.getCause() != null && e.getCause().getCause() != null) { - // ignore hazelcast already shutdown exception - if (!"Hazelcast Instance is not active!".equals(e.getCause().getCause().getMessage())) { - throw new ServiceRuntimeException(e); - } - } - } - } - - public Member getOwningMember(String serviceURI) { - for (String memberAddr : endpointOwners.keySet()) { - for (String service : endpointOwners.get(memberAddr)) { - Endpoint ep = assemblyFactory.createEndpoint(); - ep.setURI(service); - if (ep.matches(serviceURI)) { - for (Member m : getHazelcastInstance().getCluster().getMembers()) { - if (memberAddr.equals(m.getInetSocketAddress().toString())) { - return m; - } - } - } - } - } - return null; - } - - public void addRunningComposite(String curi, Composite composite) { - String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString(); - String compositeXML = writeComposite(composite); -// TODO: doing this in a txn causes the values to get lost - looks like a bug in hazelcast -// Transaction txn = hazelcastInstance.getTransaction(); -// txn.begin(); -// try { - Map cs = runningComposites.get(curi); - if (cs == null) { - cs = new HashMap(); - } - cs.put(composite.getName(), compositeXML); - runningComposites.put(curi, cs); - Map> ocs = runningCompositeOwners.get(localMemberAddr); - if (ocs == null) { - ocs = new HashMap>(); - } - List lcs = ocs.get(curi); - if (lcs == null) { - lcs = new ArrayList(); - ocs.put(curi, lcs); - } - lcs.add(composite.getName()); - runningCompositeOwners.put(localMemberAddr, ocs); -// txn.commit(); -// } catch (Throwable e) { -// txn.rollback(); -// throw new ServiceRuntimeException(e); -// } - } - - public void removeRunningComposite(String curi, QName name) { - String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString(); - Transaction txn = hazelcastInstance.getTransaction(); - txn.begin(); - try { - Map cs = runningComposites.get(curi); - if (cs != null) { - cs.remove(name); - } - Map> ocs = runningCompositeOwners.get(localMemberAddr); - if (ocs != null) { - List xya = ocs.get(curi); - if (xya != null) { - xya.remove(name); - } - } - txn.commit(); - } catch (Throwable e) { - txn.rollback(); - throw new ServiceRuntimeException(e); - } - } - - public Map> getRunningCompositeNames() { - Map> compositeNames = new HashMap>(); - for (String curi : runningComposites.keySet()) { - List names = new ArrayList(); - compositeNames.put(curi, names); - for (QName qn : runningComposites.get(curi).keySet()) { - names.add(qn); - } - } - return compositeNames; - } - - @Override - public Composite getRunningComposite(String contributionURI, QName name) { - Map cs = runningComposites.get(contributionURI); - if (cs != null) { - String compositeXML = cs.get(name); - return readComposite(compositeXML); - } - return null; - } - - protected Composite readComposite(String compositeXML) { - try { - StAXHelper stAXHelper = StAXHelper.getInstance(registry); - StAXArtifactProcessorExtensionPoint staxProcessors = registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class); - ExtensibleStAXArtifactProcessor staxProcessor = new ExtensibleStAXArtifactProcessor(staxProcessors, stAXHelper.getInputFactory(), null); - XMLStreamReader reader = stAXHelper.createXMLStreamReader(compositeXML); - Composite composite = (Composite)staxProcessor.read(reader, new ProcessorContext(registry)); - return composite; - } catch (XMLStreamException e) { - throw new RuntimeException(e); - } catch (ContributionReadException e) { - throw new RuntimeException(e); - } - } - - protected String writeComposite(Composite composite) { - try { - StAXHelper stAXHelper = StAXHelper.getInstance(registry); - StAXArtifactProcessorExtensionPoint staxProcessors = registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class); - ExtensibleStAXArtifactProcessor staxProcessor = new ExtensibleStAXArtifactProcessor(staxProcessors, null, stAXHelper.getOutputFactory()); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - staxProcessor.write(composite, bos, new ProcessorContext(registry)); - bos.close(); - return bos.toString(); - } catch (ContributionWriteException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public List getInstalledContributionURIs() { - return new ArrayList(installedContributions.keySet()); - } - - public InstalledContribution getInstalledContribution(String uri) { - return installedContributions.get(uri); - } - - public void uninstallContribution(String uri) { - installedContributions.remove(uri); - } - - @Override - public void installContribution(InstalledContribution ic) { - installedContributions.put(ic.getURI(), ic); - } - - @Override - public void updateInstalledContribution(InstalledContribution ic) { - installedContributions.put(ic.getURI(), ic); - } - -} diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastDomainRegistryFactory.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastDomainRegistryFactory.java deleted file mode 100644 index a96648d365..0000000000 --- a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastDomainRegistryFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.hazelcast; - -import java.util.Properties; - -import org.apache.tuscany.sca.core.ExtensionPointRegistry; -import org.apache.tuscany.sca.core.UtilityExtensionPoint; -import org.apache.tuscany.sca.runtime.BaseDomainRegistryFactory; -import org.apache.tuscany.sca.runtime.DomainRegistry; -import org.apache.tuscany.sca.runtime.RuntimeProperties; - -/** - * The utility responsible for finding the endpoint regstry by the scheme and creating instances for the - * given domain - */ -public class HazelcastDomainRegistryFactory extends BaseDomainRegistryFactory { - private final static String[] schemes = new String[] {"multicast", "wka", "tuscany", "hazelcast", "uri"}; - - public HazelcastDomainRegistryFactory(ExtensionPointRegistry registry) { - super(registry); - } - - protected DomainRegistry createEndpointRegistry(String endpointRegistryURI, String domainURI) { - Properties properties = registry.getExtensionPoint(UtilityExtensionPoint.class).getUtility(RuntimeProperties.class).getProperties(); - return new HazelcastDomainRegistry(registry, properties, endpointRegistryURI, domainURI); - } - - public String[] getSupportedSchemes() { - return schemes; - } -} diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryConfig.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryConfig.java deleted file mode 100644 index 4bf3d6c73a..0000000000 --- a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryConfig.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.hazelcast; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - - -/** - * Utility to parse the config properties. - * - * bind - ip[:port] - defines the local bind address and port, it defaults port 14820 and if that port in use it will try - * incrementing by one till a free port is found. - * - * multicast - groupip:port | off - defines if multicast discovery is used and if so what multicast ip group and port is used - * defaults to 224.5.12.10:51482. A value of off means multicast is disabled. - * - * wka - ip[:port] - a comma separated list of ip address and port for remote nodes in the domain group. The port defaults to 14820. - * - * userid - is the userid other nodes must use to connect to this domain group. The default is the default domain name. - * - * password - is the password other nodes must use to connect to this domain group. The default is 'tuscany'. - * - * client - true means this is an SCAClient call - * - */ -public class RegistryConfig { - - private String bindAddress = "*"; - private int bindPort = 14820; - private boolean multicastDisabled = false; - private String multicastAddress = "224.5.12.10"; - private int multicastPort = 51482; - private List wkas = new ArrayList(); - private String userid; - private String password; - boolean client; - - public RegistryConfig(Properties properties) { - init(properties); - } - - private void init(Properties properties) { - - String bindValue = properties.getProperty("bind"); - if (bindValue != null) { - if (bindValue.indexOf(":") == -1) { - this.bindAddress = bindValue; - } else { - String[] addr = bindValue.split(":"); - this.bindAddress = addr[0]; - this.bindPort = Integer.parseInt(addr[1]); - } - } - - String multicastValue = properties.getProperty("multicast"); - if (multicastValue != null) { - if ("off".equalsIgnoreCase(multicastValue)) { - this.multicastDisabled = true; - } else { - if (multicastValue.indexOf(":") == -1) { - this.multicastAddress = multicastValue; - } else { - String[] addr = multicastValue.split(":"); - this.multicastAddress = addr[0]; - this.multicastPort = Integer.parseInt(addr[1]); - } - } - } - - String wkaValue = properties.getProperty("wka"); - if (wkaValue != null) { - String[] ips = wkaValue.split(","); - for (String ip : ips) { - if (ip.indexOf(":") == -1) { - wkas.add(ip + ":14820"); - } else { - wkas.add(ip); - } - } - } - - this.client = Boolean.parseBoolean(properties.getProperty("client", "false")); - this.password = properties.getProperty("password", "tuscany"); - this.userid = properties.getProperty("userid", properties.getProperty("defaultDomainName", "default")); - - } - - public String getBindAddress() { - return bindAddress; - } - - public int getBindPort() { - return bindPort; - } - - public boolean isMulticastDisabled() { - return multicastDisabled; - } - - public String getMulticastAddress() { - return multicastAddress; - } - - public int getMulticastPort() { - return multicastPort; - } - - public List getWKAs() { - return wkas; - } - - public String getUserid() { - return userid; - } - public String getPassword() { - return password; - } - - /** - * Parse the config string into a Properties object. - * The config URI has the following format: - * uri:?name=value&... - */ - public static RegistryConfig parseConfigURI(String configURI) { - Properties properties = new Properties(); - int c = configURI.indexOf(':'); - if (c > -1) { - configURI = configURI.substring(c+1); - } - int qm = configURI.indexOf('?'); - if (qm < 0) { - properties.setProperty("defaultDomainName", configURI); - } else { - if (qm == 0) { - properties.setProperty("defaultDomainName", "default"); - } else { - properties.setProperty("defaultDomainName", configURI.substring(0, qm)); - } - if (configURI.length() > qm+1) { - Map params = new HashMap(); - for (String param : configURI.substring(qm+1).split("&")) { - String[] px = param.split("="); - if (px.length == 2) { - params.put(px[0], px[1]); - } else { - params.put(px[0], ""); - } - } - for (String name : params.keySet()) { - properties.setProperty(name, params.get(name)); - } - } - } - return new RegistryConfig(properties); - } -} diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/client/HazelcastClientDomainRegistryFactory.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/client/HazelcastClientDomainRegistryFactory.java deleted file mode 100644 index f6a5b487e4..0000000000 --- a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/client/HazelcastClientDomainRegistryFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.hazelcast.client; - -import org.apache.tuscany.sca.core.ExtensionPointRegistry; -import org.apache.tuscany.sca.runtime.BaseDomainRegistryFactory; -import org.apache.tuscany.sca.runtime.DomainRegistry; - -/** - * The utility responsible for finding the endpoint regstry by the scheme and creating instances for the - * given domain - */ -public class HazelcastClientDomainRegistryFactory extends BaseDomainRegistryFactory { - private final static String[] schemes = new String[] {"hazelcastclient", "tuscanyclient"}; - - /** - * @param extensionRegistry - */ - public HazelcastClientDomainRegistryFactory(ExtensionPointRegistry registry) { - super(registry); - } - - protected DomainRegistry createEndpointRegistry(String endpointRegistryURI, String domainURI) { - DomainRegistry domainRegistry = - new HazelcastClientEndpointRegistry(registry, null, endpointRegistryURI, domainURI); - return domainRegistry; - } - - public String[] getSupportedSchemes() { - return schemes; - } -} diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/client/HazelcastClientEndpointRegistry.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/client/HazelcastClientEndpointRegistry.java deleted file mode 100644 index ce1075f207..0000000000 --- a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/client/HazelcastClientEndpointRegistry.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.hazelcast.client; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.Socket; -import java.util.Map; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.tuscany.sca.core.ExtensionPointRegistry; -import org.apache.tuscany.sca.core.UtilityExtensionPoint; -import org.apache.tuscany.sca.endpoint.hazelcast.HazelcastDomainRegistry; -import org.apache.tuscany.sca.endpoint.hazelcast.RegistryConfig; -import org.apache.tuscany.sca.runtime.RuntimeProperties; - -import com.hazelcast.client.ClientProperties; -import com.hazelcast.client.ClientProperties.ClientPropertyName; -import com.hazelcast.client.HazelcastClient; -import com.hazelcast.core.HazelcastInstance; - -/** - * An DomainRegistry using a Hazelcast Native Client - */ -public class HazelcastClientEndpointRegistry extends HazelcastDomainRegistry { - - RegistryConfig rc; - HazelcastClient hazelcastClient; - - public HazelcastClientEndpointRegistry(ExtensionPointRegistry registry, - Map attributes, - String domainRegistryURI, - String domainURI) { - super(registry, attributes, domainRegistryURI, domainURI); - } - - @Override - public void start() { - if (endpointMap != null) { - throw new IllegalStateException("The registry has already been started"); - } - initHazelcastClientInstance(); - endpointMap = hazelcastClient.getMap(rc.getUserid() + "/Endpoints"); - endpointOwners = hazelcastClient.getMultiMap(rc.getUserid() + "/EndpointOwners"); - } - - @Override - public void stop() { - if (hazelcastClient != null) { - hazelcastClient.shutdown(); - hazelcastClient = null; - endpointMap = null; - } - } - - private void initHazelcastClientInstance() { - if (this.domainURI == null) { - this.properties = registry.getExtensionPoint(UtilityExtensionPoint.class).getUtility(RuntimeProperties.class).getProperties(); - this.domainURI = properties.getProperty("defaultDomainName", "default"); - } - this.rc = RegistryConfig.parseConfigURI(domainURI); - if (rc.getWKAs().size() < 1) { - String ip = getDefaultWKA(); - if (ip != null) { - rc.getWKAs().add(ip); - } - } - if (rc.getWKAs().size() < 1) { - throw new IllegalArgumentException("No local domain instance found, please use domain URI 'wka=' argument to define IP address(es) for domain"); - } - - // Hazelcast is outputs a lot on info level log messages which are unnecessary for us, - // so disable info logging for hazelcast client classes unless fine logging is on for tuscany. - if (!Logger.getLogger(this.getClass().getName()).isLoggable(Level.CONFIG)) { - Logger hzl = Logger.getLogger("com.hazelcast"); - if (!hzl.isLoggable(Level.FINE)) { - hzl.setLevel(Level.WARNING); - } - } - - ClientProperties clientProps = ClientProperties.crateBaseClientProperties(rc.getUserid(), rc.getPassword()); - clientProps.setPropertyValue(ClientPropertyName.INIT_CONNECTION_ATTEMPTS_LIMIT, "1"); - this.hazelcastClient = HazelcastClient.newHazelcastClient(clientProps, rc.getWKAs().toArray(new String[0])); - } - - @Override - public HazelcastInstance getHazelcastInstance() { - return hazelcastClient; - } - - /** - * As a default connect to a local runtime instance listening on port 14820 - */ - protected static String getDefaultWKA() { - Socket s = null; - try { - s = new Socket(InetAddress.getLocalHost(), 14820); - if (s.isConnected()) { - return s.getInetAddress().getHostAddress() + ":14820"; - } - } catch (IOException e) { - } finally { - if (s != null) { - try { - s.close(); - } catch (IOException e) { - } - } - } - return null; - } -} diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/registry/hazelcast/HazelcastDomainRegistry.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/registry/hazelcast/HazelcastDomainRegistry.java new file mode 100644 index 0000000000..f74375eca3 --- /dev/null +++ b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/registry/hazelcast/HazelcastDomainRegistry.java @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.registry.hazelcast; + +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.StringReader; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.wsdl.Definition; +import javax.wsdl.WSDLException; +import javax.wsdl.xml.WSDLReader; +import javax.wsdl.xml.WSDLWriter; +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; + +import org.apache.tuscany.sca.assembly.AssemblyFactory; +import org.apache.tuscany.sca.assembly.Composite; +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.common.xml.stax.StAXHelper; +import org.apache.tuscany.sca.contribution.processor.ContributionReadException; +import org.apache.tuscany.sca.contribution.processor.ContributionWriteException; +import org.apache.tuscany.sca.contribution.processor.ExtensibleStAXArtifactProcessor; +import org.apache.tuscany.sca.contribution.processor.ProcessorContext; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.interfacedef.InterfaceContract; +import org.apache.tuscany.sca.interfacedef.wsdl.WSDLDefinition; +import org.apache.tuscany.sca.interfacedef.wsdl.WSDLFactory; +import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterface; +import org.apache.tuscany.sca.interfacedef.wsdl.WSDLInterfaceContract; +import org.apache.tuscany.sca.runtime.BaseDomainRegistry; +import org.apache.tuscany.sca.runtime.ContributionListener; +import org.apache.tuscany.sca.runtime.DomainRegistry; +import org.apache.tuscany.sca.runtime.InstalledContribution; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeProperties; +import org.oasisopen.sca.ServiceRuntimeException; +import org.xml.sax.InputSource; + +import com.hazelcast.config.Config; +import com.hazelcast.config.NearCacheConfig; +import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.config.XmlConfigBuilder; +import com.hazelcast.core.EntryEvent; +import com.hazelcast.core.EntryListener; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.ILock; +import com.hazelcast.core.IMap; +import com.hazelcast.core.Member; +import com.hazelcast.core.MembershipEvent; +import com.hazelcast.core.MembershipListener; +import com.hazelcast.core.MultiMap; +import com.hazelcast.core.Transaction; +import com.hazelcast.nio.Address; + +/** + * An DomainRegistry using a Hazelcast + */ +public class HazelcastDomainRegistry extends BaseDomainRegistry implements DomainRegistry, LifeCycleListener, EntryListener, MembershipListener { + private final static Logger logger = Logger.getLogger(HazelcastDomainRegistry.class.getName()); + + private HazelcastInstance hazelcastInstance; + + protected Map endpointMap; + protected MultiMap endpointOwners; + protected Map> runningComposites; + protected Map>> runningCompositeOwners; + + protected Map endpointWsdls; + protected Map localEndpoints = new ConcurrentHashMap(); + + protected Map installedContributions; + + protected AssemblyFactory assemblyFactory; + protected Object shutdownMutex = new Object(); + protected Properties properties; + + public HazelcastDomainRegistry(ExtensionPointRegistry registry, Properties properties, String endpointRegistryURI, String domainURI) { + super(registry, null, endpointRegistryURI, domainURI); + this.assemblyFactory = registry.getExtensionPoint(FactoryExtensionPoint.class).getFactory(AssemblyFactory.class); + this.properties = properties; + } + + public HazelcastDomainRegistry(ExtensionPointRegistry registry, + Map attributes, + String domainRegistryURI, + String domainURI) { + super(registry, attributes, domainRegistryURI, domainURI); + this.assemblyFactory = registry.getExtensionPoint(FactoryExtensionPoint.class).getFactory(AssemblyFactory.class); + this.properties = registry.getExtensionPoint(UtilityExtensionPoint.class).getUtility(RuntimeProperties.class).getProperties(); + } + + public HazelcastInstance getHazelcastInstance() { + return hazelcastInstance; + } + + public void start() { + if (endpointMap != null) { + throw new IllegalStateException("The registry has already been started"); + } +// if (configURI.toString().startsWith("tuscany:vm:")) { +// endpointMap = new HashMap(); +// } else { + initHazelcastInstance(); + IMap imap = hazelcastInstance.getMap(domainURI + "/Endpoints"); + imap.addEntryListener(this, true); + endpointMap = imap; + + endpointOwners = hazelcastInstance.getMultiMap(domainURI + "/EndpointOwners"); + endpointWsdls = hazelcastInstance.getMap(domainURI + "/EndpointWsdls"); + + runningComposites = hazelcastInstance.getMap(domainURI + "/RunningComposites"); + runningCompositeOwners = hazelcastInstance.getMap(domainURI + "/RunningCompositeOwners"); + + installedContributions = hazelcastInstance.getMap(domainURI + "/InstalledContributions"); + ((IMap)installedContributions).addEntryListener(new EntryListener() { + public void entryAdded(EntryEvent event) { + } + public void entryRemoved(EntryEvent event) { + for (ContributionListener listener : contributionlisteners) { + listener.contributionRemoved(event.getKey()); + } + } + public void entryUpdated(EntryEvent event) { + for (ContributionListener listener : contributionlisteners) { + listener.contributionUpdated(event.getKey()); + } + } + public void entryEvicted(EntryEvent event) { + } + }, false); + + hazelcastInstance.getCluster().addMembershipListener(this); +// } + } + + public void stop() { + if (hazelcastInstance != null) { + synchronized (shutdownMutex) { + hazelcastInstance.shutdown(); + hazelcastInstance = null; + endpointMap = null; + endpointOwners = null; + endpointWsdls = null; + runningComposites = null; + runningCompositeOwners = null; + } + } + } + + private void initHazelcastInstance() { + + // Hazelcast is outputs a lot on info level log messages which are unnecessary for us, + // so disable info logging for hazelcast client classes unless fine logging is on for tuscany. + if (!logger.isLoggable(Level.CONFIG)) { + Logger hzl = Logger.getLogger("com.hazelcast"); + if (!hzl.isLoggable(Level.FINE)) { + hzl.setLevel(Level.WARNING); + // we want the ClusterManager info messages so we can see nodes come and go + Logger.getLogger("com.hazelcast.cluster.ClusterManager").setLevel(Level.INFO); + // we don't want any of the XmlConfigBuilder warnings as set the config programatically + Logger.getLogger("com.hazelcast.config.XmlConfigBuilder").setLevel(Level.SEVERE); + } + } + + Config config = getHazelcastConfig(); + + // do this when theres a way to have adders be the key owners + // config.getMapConfig(configURI.getDomainName() + "/Endpoints").setBackupCount(0); + + // this caches reads locally + config.getMapConfig("default").setNearCacheConfig(new NearCacheConfig(0, 0, "NONE", 0, true)); + + // Disable the Hazelcast shutdown hook as Tuscany has its own and with both there are race conditions + config.setProperty("hazelcast.shutdownhook.enabled", + // GroupProperties.PROP_SHUTDOWNHOOK_ENABLED, + "false"); + + // By default this is 5 seconds, not sure what the implications are but dropping it down to 1 makes + // things like the samples look much faster + config.setProperty("hazelcast.wait.seconds.before.join", + // GroupProperties.PROP_WAIT_SECONDS_BEFORE_JOIN, + "1"); + + this.hazelcastInstance = Hazelcast.newHazelcastInstance(config); + if (logger.isLoggable(Level.INFO)) { + logger.info("started node in domain '" + domainURI + "' + at: " + hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress()); + } + } + + protected Config getHazelcastConfig() { + Config config; + this.properties = registry.getExtensionPoint(UtilityExtensionPoint.class).getUtility(RuntimeProperties.class).getProperties(); + String configFile = properties.getProperty("hazelcastConfig"); + if (configFile != null) { + try { + config = new XmlConfigBuilder(configFile).build(); + } catch (FileNotFoundException e) { + throw new IllegalArgumentException(configFile, e); + } + } else { + // TUSCANY-3675 - domainRegistryURI properties don't seem to be copied into the + // properties collection anywhere + config = new XmlConfigBuilder().build(); + RegistryConfig rc = RegistryConfig.parseConfigURI(domainRegistryURI); + config.setPort(rc.getBindPort()); + //config.setPortAutoIncrement(false); + + if (!rc.getBindAddress().equals("*")) { + config.getNetworkConfig().getInterfaces().setEnabled(true); + config.getNetworkConfig().getInterfaces().clear(); + config.getNetworkConfig().getInterfaces().addInterface(rc.getBindAddress()); + } + + config.getGroupConfig().setName(rc.getUserid()); + config.getGroupConfig().setPassword(rc.getPassword()); + + if (rc.isMulticastDisabled()) { + config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false); + } else { + config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(true); + config.getNetworkConfig().getJoin().getMulticastConfig().setMulticastPort(rc.getMulticastPort()); + config.getNetworkConfig().getJoin().getMulticastConfig().setMulticastGroup(rc.getMulticastAddress()); + } + + if (rc.getWKAs().size() > 0) { + TcpIpConfig tcpconfig = config.getNetworkConfig().getJoin().getTcpIpConfig(); + tcpconfig.setEnabled(true); + List
lsMembers = tcpconfig.getAddresses(); + lsMembers.clear(); + for (String addr : rc.getWKAs()) { + String[] ipNPort = addr.split(":"); + try { + lsMembers.add(new Address(ipNPort[0], Integer.parseInt(ipNPort[1]))); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + } + } + return config; + } + + public void addEndpoint(Endpoint endpoint) { + if (findEndpoint(endpoint.getURI()).size() > 0) { + Member m = getOwningMember(endpoint.getURI()); + throw new IllegalStateException("Endpoint " + endpoint.getURI() + " already exists in domain " + domainURI + " at " + m.getInetSocketAddress()); + } + + String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString(); + String endpointURI = endpoint.getURI(); + String wsdl = getWsdl(endpoint); + Transaction txn = hazelcastInstance.getTransaction(); + txn.begin(); + try { + localEndpoints.put(endpointURI, endpoint); + endpointMap.put(endpointURI, endpoint); + endpointWsdls.put(endpointURI, wsdl); + endpointOwners.put(localMemberAddr, endpointURI); + txn.commit(); + } catch (Throwable e) { + txn.rollback(); + throw new ServiceRuntimeException(e); + } + logger.info("Add endpoint - " + endpoint); + } + + private String getWsdl(Endpoint endpoint) { + WSDLInterfaceContract wsdlIC = (WSDLInterfaceContract)((RuntimeEndpoint)endpoint).getGeneratedWSDLContract(endpoint.getComponentServiceInterfaceContract()); + if (wsdlIC == null) { + return ""; + } + WSDLInterface wsdl = (WSDLInterface)wsdlIC.getInterface(); + WSDLDefinition d = wsdl.getWsdlDefinition(); + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + try { + WSDLWriter writer = javax.wsdl.factory.WSDLFactory.newInstance().newWSDLWriter(); + writer.writeWSDL(d.getDefinition(), outStream); + } catch (Exception e){ + throw new RuntimeException(e); + } + return outStream.toString(); + } + + public List findEndpoint(String uri) { + List foundEndpoints = new ArrayList(); + for (Object v : endpointMap.values()) { + Endpoint endpoint = (Endpoint)v; + logger.fine("Matching against - " + endpoint); + if (endpoint.matches(uri)) { + endpoint = localizeEndpoint(endpoint); + foundEndpoints.add(endpoint); + logger.fine("Found endpoint with matching service - " + endpoint); + } + } + return foundEndpoints; + } + + private Endpoint localizeEndpoint(Endpoint endpoint) { + if (endpoint == null) return null; + if (!isLocal(endpoint)) { + endpoint.setRemote(true); + ((RuntimeEndpoint)endpoint).bind(registry, this); + try { + setNormailizedWSDLContract(endpoint); + } catch (WSDLException e) { + throw new RuntimeException(e); + } + } else { + // get the local version of the endpoint + // this local version won't have been serialized + // won't be marked as remote and will have the + // full interface contract information + endpoint = localEndpoints.get(endpoint.getURI()); + } + return endpoint; + } + + + private void setNormailizedWSDLContract(Endpoint endpoint) throws WSDLException { + String wsdl = endpointWsdls == null ? null : (String)endpointWsdls.get(endpoint.getURI()); + if (wsdl == null || wsdl.length() < 1) { + return; + } + InterfaceContract ic = endpoint.getComponentServiceInterfaceContract(); + WSDLFactory wsdlFactory = registry.getExtensionPoint(FactoryExtensionPoint.class).getFactory(WSDLFactory.class); + WSDLInterfaceContract wsdlIC = wsdlFactory.createWSDLInterfaceContract(); + WSDLInterface wsdlIface = wsdlFactory.createWSDLInterface(); + WSDLDefinition wsdlDef = wsdlFactory.createWSDLDefinition(); + WSDLReader reader = javax.wsdl.factory.WSDLFactory.newInstance().newWSDLReader(); + InputSource inputSource = new InputSource(new StringReader(wsdl)); + Definition def = reader.readWSDL("", inputSource); + wsdlDef.setDefinition(def); + wsdlIface.setWsdlDefinition(wsdlDef); + wsdlIC.setInterface(wsdlIface); + ic.setNormailizedWSDLContract(wsdlIC); + } + + private boolean isLocal(Endpoint endpoint) { + return localEndpoints.containsKey(endpoint.getURI()); + } + + public Endpoint getEndpoint(String uri) { + return localizeEndpoint((Endpoint)endpointMap.get(uri)); + } + + public List getEndpoints() { + ArrayList eps = new ArrayList(); + for (Object ep : endpointMap.values()) { + eps.add(localizeEndpoint((Endpoint)ep)); + } + return eps; + } + + public void removeEndpoint(Endpoint endpoint) { + if (hazelcastInstance == null) { + return; + } + synchronized (shutdownMutex) { + String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString(); + String endpointURI = endpoint.getURI(); + + Transaction txn = hazelcastInstance.getTransaction(); + txn.begin(); + try { + endpointOwners.remove(localMemberAddr, endpointURI); + endpointMap.remove(endpointURI); + endpointWsdls.remove(endpointURI); + txn.commit(); + } catch (Throwable e) { + txn.rollback(); + throw new ServiceRuntimeException(e); + } + localEndpoints.remove(endpointURI); + logger.info("Removed endpoint - " + endpoint); + } + } + + + public void entryAdded(EntryEvent event) { + entryAdded(event.getKey(), event.getValue()); + } + + public void entryEvicted(EntryEvent event) { + // Should not happen + } + + public void entryRemoved(EntryEvent event) { + entryRemoved(event.getKey(), event.getValue()); + } + + public void entryUpdated(EntryEvent event) { + entryUpdated(event.getKey(), null, event.getValue()); + } + + public void entryAdded(Object key, Object value) { + Endpoint newEp = (Endpoint)value; + if (!isLocal(newEp)) { + logger.info(" Remote endpoint added: " + newEp); + } + endpointAdded(newEp); + } + + public void entryRemoved(Object key, Object value) { + Endpoint oldEp = (Endpoint)value; + if (!isLocal(oldEp)) { + logger.info(" Remote endpoint removed: " + value); + } + endpointRemoved(oldEp); + } + + public void entryUpdated(Object key, Object oldValue, Object newValue) { + Endpoint oldEp = (Endpoint)oldValue; + Endpoint newEp = (Endpoint)newValue; + if (!isLocal(newEp)) { + logger.info(" Remote endpoint updated: " + newEp); + } + endpointUpdated(oldEp, newEp); + } + + public void memberAdded(MembershipEvent event) { + } + + public void memberRemoved(MembershipEvent event) { + try { + String memberAddr = event.getMember().getInetSocketAddress().toString(); + if (endpointOwners.containsKey(memberAddr)) { + synchronized (shutdownMutex) { + ILock lock = hazelcastInstance.getLock("EndpointOwners/" + memberAddr); + lock.lock(); + try { + if (endpointOwners.containsKey(memberAddr)) { + Collection keys = endpointOwners.remove(memberAddr); + for (Object k : keys) { + endpointMap.remove(k); + endpointWsdls.remove(k); + } + } + if (runningCompositeOwners.containsKey(memberAddr)) { + Map> cs = runningCompositeOwners.remove(memberAddr); + for (String curi : cs.keySet()) { + Map rcs = runningComposites.get(curi); + for (QName qn : cs.get(curi)) { + rcs.remove(qn); + } + } + } + } finally { + lock.unlock(); + } + } + } + } catch (Exception e) { + if (e.getCause() != null && e.getCause().getCause() != null) { + // ignore hazelcast already shutdown exception + if (!"Hazelcast Instance is not active!".equals(e.getCause().getCause().getMessage())) { + throw new ServiceRuntimeException(e); + } + } + } + } + + public Member getOwningMember(String serviceURI) { + for (String memberAddr : endpointOwners.keySet()) { + for (String service : endpointOwners.get(memberAddr)) { + Endpoint ep = assemblyFactory.createEndpoint(); + ep.setURI(service); + if (ep.matches(serviceURI)) { + for (Member m : getHazelcastInstance().getCluster().getMembers()) { + if (memberAddr.equals(m.getInetSocketAddress().toString())) { + return m; + } + } + } + } + } + return null; + } + + public void addRunningComposite(String curi, Composite composite) { + String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString(); + String compositeXML = writeComposite(composite); +// TODO: doing this in a txn causes the values to get lost - looks like a bug in hazelcast +// Transaction txn = hazelcastInstance.getTransaction(); +// txn.begin(); +// try { + Map cs = runningComposites.get(curi); + if (cs == null) { + cs = new HashMap(); + } + cs.put(composite.getName(), compositeXML); + runningComposites.put(curi, cs); + Map> ocs = runningCompositeOwners.get(localMemberAddr); + if (ocs == null) { + ocs = new HashMap>(); + } + List lcs = ocs.get(curi); + if (lcs == null) { + lcs = new ArrayList(); + ocs.put(curi, lcs); + } + lcs.add(composite.getName()); + runningCompositeOwners.put(localMemberAddr, ocs); +// txn.commit(); +// } catch (Throwable e) { +// txn.rollback(); +// throw new ServiceRuntimeException(e); +// } + } + + public void removeRunningComposite(String curi, QName name) { + String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString(); + Transaction txn = hazelcastInstance.getTransaction(); + txn.begin(); + try { + Map cs = runningComposites.get(curi); + if (cs != null) { + cs.remove(name); + } + Map> ocs = runningCompositeOwners.get(localMemberAddr); + if (ocs != null) { + List xya = ocs.get(curi); + if (xya != null) { + xya.remove(name); + } + } + txn.commit(); + } catch (Throwable e) { + txn.rollback(); + throw new ServiceRuntimeException(e); + } + } + + public Map> getRunningCompositeNames() { + Map> compositeNames = new HashMap>(); + for (String curi : runningComposites.keySet()) { + List names = new ArrayList(); + compositeNames.put(curi, names); + for (QName qn : runningComposites.get(curi).keySet()) { + names.add(qn); + } + } + return compositeNames; + } + + @Override + public Composite getRunningComposite(String contributionURI, QName name) { + Map cs = runningComposites.get(contributionURI); + if (cs != null) { + String compositeXML = cs.get(name); + return readComposite(compositeXML); + } + return null; + } + + protected Composite readComposite(String compositeXML) { + try { + StAXHelper stAXHelper = StAXHelper.getInstance(registry); + StAXArtifactProcessorExtensionPoint staxProcessors = registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class); + ExtensibleStAXArtifactProcessor staxProcessor = new ExtensibleStAXArtifactProcessor(staxProcessors, stAXHelper.getInputFactory(), null); + XMLStreamReader reader = stAXHelper.createXMLStreamReader(compositeXML); + Composite composite = (Composite)staxProcessor.read(reader, new ProcessorContext(registry)); + return composite; + } catch (XMLStreamException e) { + throw new RuntimeException(e); + } catch (ContributionReadException e) { + throw new RuntimeException(e); + } + } + + protected String writeComposite(Composite composite) { + try { + StAXHelper stAXHelper = StAXHelper.getInstance(registry); + StAXArtifactProcessorExtensionPoint staxProcessors = registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class); + ExtensibleStAXArtifactProcessor staxProcessor = new ExtensibleStAXArtifactProcessor(staxProcessors, null, stAXHelper.getOutputFactory()); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + staxProcessor.write(composite, bos, new ProcessorContext(registry)); + bos.close(); + return bos.toString(); + } catch (ContributionWriteException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public List getInstalledContributionURIs() { + return new ArrayList(installedContributions.keySet()); + } + + public InstalledContribution getInstalledContribution(String uri) { + return installedContributions.get(uri); + } + + public void uninstallContribution(String uri) { + installedContributions.remove(uri); + } + + @Override + public void installContribution(InstalledContribution ic) { + installedContributions.put(ic.getURI(), ic); + } + + @Override + public void updateInstalledContribution(InstalledContribution ic) { + installedContributions.put(ic.getURI(), ic); + } + +} diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/registry/hazelcast/HazelcastDomainRegistryFactory.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/registry/hazelcast/HazelcastDomainRegistryFactory.java new file mode 100644 index 0000000000..fda24cd78c --- /dev/null +++ b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/registry/hazelcast/HazelcastDomainRegistryFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.registry.hazelcast; + +import java.util.Properties; + +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.runtime.BaseDomainRegistryFactory; +import org.apache.tuscany.sca.runtime.DomainRegistry; +import org.apache.tuscany.sca.runtime.RuntimeProperties; + +/** + * The utility responsible for finding the endpoint regstry by the scheme and creating instances for the + * given domain + */ +public class HazelcastDomainRegistryFactory extends BaseDomainRegistryFactory { + private final static String[] schemes = new String[] {"multicast", "wka", "tuscany", "hazelcast", "uri"}; + + public HazelcastDomainRegistryFactory(ExtensionPointRegistry registry) { + super(registry); + } + + protected DomainRegistry createEndpointRegistry(String endpointRegistryURI, String domainURI) { + Properties properties = registry.getExtensionPoint(UtilityExtensionPoint.class).getUtility(RuntimeProperties.class).getProperties(); + return new HazelcastDomainRegistry(registry, properties, endpointRegistryURI, domainURI); + } + + public String[] getSupportedSchemes() { + return schemes; + } +} diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/registry/hazelcast/RegistryConfig.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/registry/hazelcast/RegistryConfig.java new file mode 100644 index 0000000000..316abc7e2d --- /dev/null +++ b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/registry/hazelcast/RegistryConfig.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.registry.hazelcast; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + + +/** + * Utility to parse the config properties. + * + * bind - ip[:port] - defines the local bind address and port, it defaults port 14820 and if that port in use it will try + * incrementing by one till a free port is found. + * + * multicast - groupip:port | off - defines if multicast discovery is used and if so what multicast ip group and port is used + * defaults to 224.5.12.10:51482. A value of off means multicast is disabled. + * + * wka - ip[:port] - a comma separated list of ip address and port for remote nodes in the domain group. The port defaults to 14820. + * + * userid - is the userid other nodes must use to connect to this domain group. The default is the default domain name. + * + * password - is the password other nodes must use to connect to this domain group. The default is 'tuscany'. + * + * client - true means this is an SCAClient call + * + */ +public class RegistryConfig { + + private String bindAddress = "*"; + private int bindPort = 14820; + private boolean multicastDisabled = false; + private String multicastAddress = "224.5.12.10"; + private int multicastPort = 51482; + private List wkas = new ArrayList(); + private String userid; + private String password; + boolean client; + + public RegistryConfig(Properties properties) { + init(properties); + } + + private void init(Properties properties) { + + String bindValue = properties.getProperty("bind"); + if (bindValue != null) { + if (bindValue.indexOf(":") == -1) { + this.bindAddress = bindValue; + } else { + String[] addr = bindValue.split(":"); + this.bindAddress = addr[0]; + this.bindPort = Integer.parseInt(addr[1]); + } + } + + String multicastValue = properties.getProperty("multicast"); + if (multicastValue != null) { + if ("off".equalsIgnoreCase(multicastValue)) { + this.multicastDisabled = true; + } else { + if (multicastValue.indexOf(":") == -1) { + this.multicastAddress = multicastValue; + } else { + String[] addr = multicastValue.split(":"); + this.multicastAddress = addr[0]; + this.multicastPort = Integer.parseInt(addr[1]); + } + } + } + + String wkaValue = properties.getProperty("wka"); + if (wkaValue != null) { + String[] ips = wkaValue.split(","); + for (String ip : ips) { + if (ip.indexOf(":") == -1) { + wkas.add(ip + ":14820"); + } else { + wkas.add(ip); + } + } + } + + this.client = Boolean.parseBoolean(properties.getProperty("client", "false")); + this.password = properties.getProperty("password", "tuscany"); + this.userid = properties.getProperty("userid", properties.getProperty("defaultDomainName", "default")); + + } + + public String getBindAddress() { + return bindAddress; + } + + public int getBindPort() { + return bindPort; + } + + public boolean isMulticastDisabled() { + return multicastDisabled; + } + + public String getMulticastAddress() { + return multicastAddress; + } + + public int getMulticastPort() { + return multicastPort; + } + + public List getWKAs() { + return wkas; + } + + public String getUserid() { + return userid; + } + public String getPassword() { + return password; + } + + /** + * Parse the config string into a Properties object. + * The config URI has the following format: + * uri:?name=value&... + */ + public static RegistryConfig parseConfigURI(String configURI) { + Properties properties = new Properties(); + int c = configURI.indexOf(':'); + if (c > -1) { + configURI = configURI.substring(c+1); + } + int qm = configURI.indexOf('?'); + if (qm < 0) { + properties.setProperty("defaultDomainName", configURI); + } else { + if (qm == 0) { + properties.setProperty("defaultDomainName", "default"); + } else { + properties.setProperty("defaultDomainName", configURI.substring(0, qm)); + } + if (configURI.length() > qm+1) { + Map params = new HashMap(); + for (String param : configURI.substring(qm+1).split("&")) { + String[] px = param.split("="); + if (px.length == 2) { + params.put(px[0], px[1]); + } else { + params.put(px[0], ""); + } + } + for (String name : params.keySet()) { + properties.setProperty(name, params.get(name)); + } + } + } + return new RegistryConfig(properties); + } +} diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/regsitry/hazelcast/client/HazelcastClientDomainRegistryFactory.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/regsitry/hazelcast/client/HazelcastClientDomainRegistryFactory.java new file mode 100644 index 0000000000..104cf928e5 --- /dev/null +++ b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/regsitry/hazelcast/client/HazelcastClientDomainRegistryFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.regsitry.hazelcast.client; + +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.runtime.BaseDomainRegistryFactory; +import org.apache.tuscany.sca.runtime.DomainRegistry; + +/** + * The utility responsible for finding the endpoint regstry by the scheme and creating instances for the + * given domain + */ +public class HazelcastClientDomainRegistryFactory extends BaseDomainRegistryFactory { + private final static String[] schemes = new String[] {"hazelcastclient", "tuscanyclient"}; + + /** + * @param extensionRegistry + */ + public HazelcastClientDomainRegistryFactory(ExtensionPointRegistry registry) { + super(registry); + } + + protected DomainRegistry createEndpointRegistry(String endpointRegistryURI, String domainURI) { + DomainRegistry domainRegistry = + new HazelcastClientEndpointRegistry(registry, null, endpointRegistryURI, domainURI); + return domainRegistry; + } + + public String[] getSupportedSchemes() { + return schemes; + } +} diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/regsitry/hazelcast/client/HazelcastClientEndpointRegistry.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/regsitry/hazelcast/client/HazelcastClientEndpointRegistry.java new file mode 100644 index 0000000000..e11f0a07e4 --- /dev/null +++ b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/java/org/apache/tuscany/sca/regsitry/hazelcast/client/HazelcastClientEndpointRegistry.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.regsitry.hazelcast.client; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.registry.hazelcast.HazelcastDomainRegistry; +import org.apache.tuscany.sca.registry.hazelcast.RegistryConfig; +import org.apache.tuscany.sca.runtime.RuntimeProperties; + +import com.hazelcast.client.ClientProperties; +import com.hazelcast.client.ClientProperties.ClientPropertyName; +import com.hazelcast.client.HazelcastClient; +import com.hazelcast.core.HazelcastInstance; + +/** + * An DomainRegistry using a Hazelcast Native Client + */ +public class HazelcastClientEndpointRegistry extends HazelcastDomainRegistry { + + RegistryConfig rc; + HazelcastClient hazelcastClient; + + public HazelcastClientEndpointRegistry(ExtensionPointRegistry registry, + Map attributes, + String domainRegistryURI, + String domainURI) { + super(registry, attributes, domainRegistryURI, domainURI); + } + + @Override + public void start() { + if (endpointMap != null) { + throw new IllegalStateException("The registry has already been started"); + } + initHazelcastClientInstance(); + endpointMap = hazelcastClient.getMap(rc.getUserid() + "/Endpoints"); + endpointOwners = hazelcastClient.getMultiMap(rc.getUserid() + "/EndpointOwners"); + } + + @Override + public void stop() { + if (hazelcastClient != null) { + hazelcastClient.shutdown(); + hazelcastClient = null; + endpointMap = null; + } + } + + private void initHazelcastClientInstance() { + if (this.domainURI == null) { + this.properties = registry.getExtensionPoint(UtilityExtensionPoint.class).getUtility(RuntimeProperties.class).getProperties(); + this.domainURI = properties.getProperty("defaultDomainName", "default"); + } + this.rc = RegistryConfig.parseConfigURI(domainURI); + if (rc.getWKAs().size() < 1) { + String ip = getDefaultWKA(); + if (ip != null) { + rc.getWKAs().add(ip); + } + } + if (rc.getWKAs().size() < 1) { + throw new IllegalArgumentException("No local domain instance found, please use domain URI 'wka=' argument to define IP address(es) for domain"); + } + + // Hazelcast is outputs a lot on info level log messages which are unnecessary for us, + // so disable info logging for hazelcast client classes unless fine logging is on for tuscany. + if (!Logger.getLogger(this.getClass().getName()).isLoggable(Level.CONFIG)) { + Logger hzl = Logger.getLogger("com.hazelcast"); + if (!hzl.isLoggable(Level.FINE)) { + hzl.setLevel(Level.WARNING); + } + } + + ClientProperties clientProps = ClientProperties.crateBaseClientProperties(rc.getUserid(), rc.getPassword()); + clientProps.setPropertyValue(ClientPropertyName.INIT_CONNECTION_ATTEMPTS_LIMIT, "1"); + this.hazelcastClient = HazelcastClient.newHazelcastClient(clientProps, rc.getWKAs().toArray(new String[0])); + } + + @Override + public HazelcastInstance getHazelcastInstance() { + return hazelcastClient; + } + + /** + * As a default connect to a local runtime instance listening on port 14820 + */ + protected static String getDefaultWKA() { + Socket s = null; + try { + s = new Socket(InetAddress.getLocalHost(), 14820); + if (s.isConnected()) { + return s.getInetAddress().getHostAddress() + ":14820"; + } + } catch (IOException e) { + } finally { + if (s != null) { + try { + s.close(); + } catch (IOException e) { + } + } + } + return null; + } +} diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory index 2dfacf1f82..4b461bceca 100644 --- a/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory +++ b/sca-java-2.x/trunk/modules/domain-hazelcast/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.DomainRegistryFactory @@ -14,5 +14,5 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -org.apache.tuscany.sca.endpoint.hazelcast.HazelcastDomainRegistryFactory;ranking=100 -org.apache.tuscany.sca.endpoint.hazelcast.client.HazelcastClientDomainRegistryFactory;ranking=100 +org.apache.tuscany.sca.registry.hazelcast.HazelcastDomainRegistryFactory;ranking=100 +org.apache.tuscany.sca.registry.hazelcast.client.HazelcastClientDomainRegistryFactory;ranking=100 diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java deleted file mode 100644 index 65926dddb1..0000000000 --- a/sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.hazelcast; - -import java.util.Properties; - -import org.apache.tuscany.sca.assembly.AssemblyFactory; -import org.apache.tuscany.sca.assembly.Binding; -import org.apache.tuscany.sca.assembly.Component; -import org.apache.tuscany.sca.assembly.Endpoint; -import org.apache.tuscany.sca.assembly.SCABindingFactory; -import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry; -import org.apache.tuscany.sca.core.ExtensionPointRegistry; -import org.apache.tuscany.sca.core.FactoryExtensionPoint; -import org.apache.tuscany.sca.core.UtilityExtensionPoint; -import org.apache.tuscany.sca.interfacedef.Interface; -import org.apache.tuscany.sca.interfacedef.InterfaceContract; -import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract; -import org.apache.tuscany.sca.runtime.RuntimeEndpoint; -import org.apache.tuscany.sca.runtime.RuntimeProperties; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -public class MultiRegTestCase { - private static ExtensionPointRegistry extensionPoints; - private static AssemblyFactory assemblyFactory; - private static SCABindingFactory scaBindingFactory; - - @BeforeClass - public static void init() { - extensionPoints = new DefaultExtensionPointRegistry(); - FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class); - assemblyFactory = factories.getFactory(AssemblyFactory.class); - scaBindingFactory = factories.getFactory(SCABindingFactory.class); - Properties properties = extensionPoints.getExtensionPoint(UtilityExtensionPoint.class).getUtility(RuntimeProperties.class).getProperties(); - properties.setProperty("bind", "127.0.0.1"); - } - - @Test - public void testReplication() throws Exception { - - System.out.println("Starting reg1"); - HazelcastDomainRegistry reg1 = new HazelcastDomainRegistry(extensionPoints, (Properties)null, "tuscany:foo?bind=127.0.0.1:9876&multicast=off", "bar"); - reg1.start(); - - System.out.println("Adding ep1"); - RuntimeEndpoint ep1 = createEndpoint("ep1uri"); - ep1.bind(extensionPoints, reg1); - reg1.addEndpoint(ep1); - - System.out.println("Starting reg3"); - HazelcastDomainRegistry reg2 = new HazelcastDomainRegistry(extensionPoints, (Properties)null, "tuscany:foo?bind=127.0.0.1:9877&multicast=off&wka=127.0.0.1:9876", "bar"); - reg2.start(); - - System.out.println("Starting reg2"); - HazelcastDomainRegistry reg3 = new HazelcastDomainRegistry(extensionPoints, (Properties)null, "tuscany:foo?bind=127.0.0.1:9878&multicast=off&wka=127.0.0.1:9877", "bar"); - reg3.start(); - - assertExists(reg1, "ep1uri"); - assertExists(reg2, "ep1uri"); - assertExists(reg3, "ep1uri"); - - System.out.println("Adding ep2"); - RuntimeEndpoint ep2 = createEndpoint("ep2uri"); - ep2.bind(extensionPoints, reg2); - reg2.addEndpoint(ep2); - - assertExists(reg2, "ep2uri"); - assertExists(reg1, "ep2uri"); - assertExists(reg3, "ep2uri"); - - System.out.println("Stopping reg1"); - reg1.stop(); - System.out.println("Stopped reg1"); - Thread.sleep(500); - - Assert.assertNull(reg2.getEndpoint("ep1uri")); - Assert.assertNull(reg3.getEndpoint("ep1uri")); - - assertExists(reg2, "ep2uri"); - assertExists(reg3, "ep2uri"); - - System.out.println("Starting reg1"); - reg1.start(); - ep1.bind(extensionPoints, reg1); - - System.out.println("adding ep1"); - reg1.addEndpoint(ep1); - assertExists(reg1, "ep1uri"); - assertExists(reg2, "ep1uri"); - assertExists(reg3, "ep1uri"); - - System.out.println("Stopping reg1"); - reg1.stop(); - System.out.println("Stopping reg2"); - reg2.stop(); - System.out.println("Stopping reg3"); - reg3.stop(); - System.out.println("done"); - } - - @Test - public void testDuplicates() throws Exception { - HazelcastDomainRegistry reg1 = new HazelcastDomainRegistry(extensionPoints, (Properties)null, "tuscany:foo?bind=127.0.0.1:9876&multicast=off", "bar"); - reg1.start(); - RuntimeEndpoint ep1 = createEndpoint("ep1uri"); - ep1.bind(extensionPoints, reg1); - reg1.addEndpoint(ep1); - - HazelcastDomainRegistry reg2 = new HazelcastDomainRegistry(extensionPoints, (Properties)null, "tuscany:foo?bind=127.0.0.1:9877&multicast=off&wka=127.0.0.1:9876", "bar"); - reg2.start(); - - try { - reg2.addEndpoint(ep1); - Assert.fail(); - } catch (IllegalStateException e) { - // expected - } - - reg1.stop(); - - Thread.sleep(200); - - // now it should work - reg2.addEndpoint(ep1); - - reg2.stop(); - } - - private Endpoint assertExists(HazelcastDomainRegistry reg, String uri) throws InterruptedException { - Endpoint ep = reg.getEndpoint(uri); - Assert.assertNotNull(ep); - Assert.assertEquals(uri, ep.getURI()); - return ep; - } - - private RuntimeEndpoint createEndpoint(String uri) { - RuntimeEndpoint ep = (RuntimeEndpoint) assemblyFactory.createEndpoint(); - Component comp = assemblyFactory.createComponent(); - ep.setComponent(comp); - ep.setService(assemblyFactory.createComponentService()); - ep.getService().setInterfaceContract(getIC()); - Binding b = scaBindingFactory.createSCABinding(); - ep.setBinding(b); - ep.setURI(uri); - return ep; - } - - private InterfaceContract getIC() { - InterfaceContract ic = new JavaInterfaceContract(){ - - public Object clone() throws CloneNotSupportedException { - return null; - } - @Override - public Interface getInterface() { - // TODO Auto-generated method stub - return null; - } - - @Override - public void setInterface(Interface callInterface) { - // TODO Auto-generated method stub - - } - - @Override - public Interface getCallbackInterface() { - // TODO Auto-generated method stub - return null; - } - - @Override - public void setCallbackInterface(Interface callbackInterface) { - // TODO Auto-generated method stub - - } - - @Override - public InterfaceContract makeUnidirectional(boolean isCallback) { - // TODO Auto-generated method stub - return null; - } - - @Override - public InterfaceContract getNormalizedWSDLContract() { - // TODO Auto-generated method stub - return null; - } - - @Override - public void setNormailizedWSDLContract(InterfaceContract wsdlInterfaceContract) { - // TODO Auto-generated method stub - - }}; - return ic; - } - -} diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryTestCase.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryTestCase.java deleted file mode 100644 index 12b420f010..0000000000 --- a/sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/RegistryTestCase.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.endpoint.hazelcast; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.List; - -import junit.framework.Assert; - -import org.junit.Ignore; -import org.junit.Test; - -import com.hazelcast.config.Config; -import com.hazelcast.config.TcpIpConfig; -import com.hazelcast.config.XmlConfigBuilder; -import com.hazelcast.core.Hazelcast; -import com.hazelcast.core.HazelcastInstance; -import com.hazelcast.core.IMap; -import com.hazelcast.nio.Address; - -@Ignore -public class RegistryTestCase { - - @Test - public void test1() throws UnknownHostException { - - HazelcastInstance h1 = create("54327", 9001); - - IMap h1map = h1.getMap("mymap"); - h1map.put("key1", "bla1"); - Assert.assertEquals("bla1", h1map.get("key1")); - - HazelcastInstance h2 = create("false", 9002, 9001); - IMap h2map = h2.getMap("mymap"); - Assert.assertEquals("bla1", h2map.get("key1")); - - HazelcastInstance h3 = create("false", 9003, 9002); - IMap h3map = h3.getMap("mymap"); - Assert.assertEquals("bla1", h3map.get("key1")); - - h3map.put("k3", "v3"); - h2map.put("k2", "v2"); - - Assert.assertEquals("v2", h1map.get("k2")); - Assert.assertEquals("v3", h1map.get("k3")); - Assert.assertEquals("v2", h2map.get("k2")); - Assert.assertEquals("v3", h2map.get("k3")); - Assert.assertEquals("v2", h3map.get("k2")); - Assert.assertEquals("v3", h3map.get("k3")); - - HazelcastInstance h4 = create("54328", 9004, 9001); - IMap h4map = h4.getMap("mymap"); -// Assert.assertNull(h4map.get("k2")); -// Assert.assertNull(h4map.get("k3")); - Assert.assertEquals("v2", h4map.get("k2")); - Assert.assertEquals("v3", h4map.get("k3")); - -// HazelcastInstance h5 = create("false", 9005, 9003, 9004); - HazelcastInstance h5 = create("54328", 9005); - -// Assert.assertEquals("v2", h4map.get("k2")); -// Assert.assertEquals("v3", h4map.get("k3")); - - IMap h5map = h5.getMap("mymap"); - Assert.assertEquals("v2", h5map.get("k2")); - Assert.assertEquals("v3", h5map.get("k3")); - - h1.shutdown(); - - Assert.assertEquals("v2", h2map.get("k2")); - Assert.assertEquals("v3", h2map.get("k3")); - Assert.assertEquals("v2", h3map.get("k2")); - Assert.assertEquals("v3", h3map.get("k3")); - Assert.assertEquals("v2", h4map.get("k2")); - Assert.assertEquals("v3", h4map.get("k3")); - - h3map.put("key1a", "bla1a"); - - Assert.assertEquals("bla1a", h2map.get("key1a")); - Assert.assertEquals("bla1a", h3map.get("key1a")); - Assert.assertEquals("bla1a", h4map.get("key1a")); - -// HazelcastInstance h4 = create(true, 9004, 9003); -// HazelcastInstance h5 = create(true, 9005); -// IMap h5map = h5.getMap("mymap"); -// Assert.assertEquals("bla1", h5map.get("key1")); - -// HazelcastInstance h6 = create(false, 9006, 9005); -// IMap h6map = h6.getMap("mymap"); -// Assert.assertEquals("bla1", h6map.get("key1")); - - } - - private HazelcastInstance create(String multicast, int listenPort, int... connectPorts) throws UnknownHostException { - Config config = new XmlConfigBuilder().build(); - config.setPort(listenPort); - config.setPortAutoIncrement(false); - - // declare the interface Hazelcast should bind to - config.getNetworkConfig().getInterfaces().clear(); - config.getNetworkConfig().getInterfaces().addInterface(InetAddress.getLocalHost().getHostAddress()); - config.getNetworkConfig().getInterfaces().setEnabled(true); - - if ("false".equals(multicast)) { - config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false); - } else { - config.getNetworkConfig().getJoin().getMulticastConfig().setMulticastPort(Integer.parseInt(multicast)); - } - - if (connectPorts.length > 0) { - TcpIpConfig tcpconfig = config.getNetworkConfig().getJoin().getTcpIpConfig(); - tcpconfig.setEnabled(true); - - List
lsMembers = tcpconfig.getAddresses(); - lsMembers.clear(); - for (int p : connectPorts) { - lsMembers.add(new Address(InetAddress.getLocalHost(), p)); - } - } - - return Hazelcast.newHazelcastInstance(config); - } - -} diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/registry/hazelcast/MultiRegTestCase.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/registry/hazelcast/MultiRegTestCase.java new file mode 100644 index 0000000000..59845bcb32 --- /dev/null +++ b/sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/registry/hazelcast/MultiRegTestCase.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.registry.hazelcast; + +import java.util.Properties; + +import org.apache.tuscany.sca.assembly.AssemblyFactory; +import org.apache.tuscany.sca.assembly.Binding; +import org.apache.tuscany.sca.assembly.Component; +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.SCABindingFactory; +import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.FactoryExtensionPoint; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.interfacedef.Interface; +import org.apache.tuscany.sca.interfacedef.InterfaceContract; +import org.apache.tuscany.sca.interfacedef.java.JavaInterfaceContract; +import org.apache.tuscany.sca.registry.hazelcast.HazelcastDomainRegistry; +import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.tuscany.sca.runtime.RuntimeProperties; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class MultiRegTestCase { + private static ExtensionPointRegistry extensionPoints; + private static AssemblyFactory assemblyFactory; + private static SCABindingFactory scaBindingFactory; + + @BeforeClass + public static void init() { + extensionPoints = new DefaultExtensionPointRegistry(); + FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class); + assemblyFactory = factories.getFactory(AssemblyFactory.class); + scaBindingFactory = factories.getFactory(SCABindingFactory.class); + Properties properties = extensionPoints.getExtensionPoint(UtilityExtensionPoint.class).getUtility(RuntimeProperties.class).getProperties(); + properties.setProperty("bind", "127.0.0.1"); + } + + @Test + public void testReplication() throws Exception { + + System.out.println("Starting reg1"); + HazelcastDomainRegistry reg1 = new HazelcastDomainRegistry(extensionPoints, (Properties)null, "tuscany:foo?bind=127.0.0.1:9876&multicast=off", "bar"); + reg1.start(); + + System.out.println("Adding ep1"); + RuntimeEndpoint ep1 = createEndpoint("ep1uri"); + ep1.bind(extensionPoints, reg1); + reg1.addEndpoint(ep1); + + System.out.println("Starting reg3"); + HazelcastDomainRegistry reg2 = new HazelcastDomainRegistry(extensionPoints, (Properties)null, "tuscany:foo?bind=127.0.0.1:9877&multicast=off&wka=127.0.0.1:9876", "bar"); + reg2.start(); + + System.out.println("Starting reg2"); + HazelcastDomainRegistry reg3 = new HazelcastDomainRegistry(extensionPoints, (Properties)null, "tuscany:foo?bind=127.0.0.1:9878&multicast=off&wka=127.0.0.1:9877", "bar"); + reg3.start(); + + assertExists(reg1, "ep1uri"); + assertExists(reg2, "ep1uri"); + assertExists(reg3, "ep1uri"); + + System.out.println("Adding ep2"); + RuntimeEndpoint ep2 = createEndpoint("ep2uri"); + ep2.bind(extensionPoints, reg2); + reg2.addEndpoint(ep2); + + assertExists(reg2, "ep2uri"); + assertExists(reg1, "ep2uri"); + assertExists(reg3, "ep2uri"); + + System.out.println("Stopping reg1"); + reg1.stop(); + System.out.println("Stopped reg1"); + Thread.sleep(500); + + Assert.assertNull(reg2.getEndpoint("ep1uri")); + Assert.assertNull(reg3.getEndpoint("ep1uri")); + + assertExists(reg2, "ep2uri"); + assertExists(reg3, "ep2uri"); + + System.out.println("Starting reg1"); + reg1.start(); + ep1.bind(extensionPoints, reg1); + + System.out.println("adding ep1"); + reg1.addEndpoint(ep1); + assertExists(reg1, "ep1uri"); + assertExists(reg2, "ep1uri"); + assertExists(reg3, "ep1uri"); + + System.out.println("Stopping reg1"); + reg1.stop(); + System.out.println("Stopping reg2"); + reg2.stop(); + System.out.println("Stopping reg3"); + reg3.stop(); + System.out.println("done"); + } + + @Test + public void testDuplicates() throws Exception { + HazelcastDomainRegistry reg1 = new HazelcastDomainRegistry(extensionPoints, (Properties)null, "tuscany:foo?bind=127.0.0.1:9876&multicast=off", "bar"); + reg1.start(); + RuntimeEndpoint ep1 = createEndpoint("ep1uri"); + ep1.bind(extensionPoints, reg1); + reg1.addEndpoint(ep1); + + HazelcastDomainRegistry reg2 = new HazelcastDomainRegistry(extensionPoints, (Properties)null, "tuscany:foo?bind=127.0.0.1:9877&multicast=off&wka=127.0.0.1:9876", "bar"); + reg2.start(); + + try { + reg2.addEndpoint(ep1); + Assert.fail(); + } catch (IllegalStateException e) { + // expected + } + + reg1.stop(); + + Thread.sleep(200); + + // now it should work + reg2.addEndpoint(ep1); + + reg2.stop(); + } + + private Endpoint assertExists(HazelcastDomainRegistry reg, String uri) throws InterruptedException { + Endpoint ep = reg.getEndpoint(uri); + Assert.assertNotNull(ep); + Assert.assertEquals(uri, ep.getURI()); + return ep; + } + + private RuntimeEndpoint createEndpoint(String uri) { + RuntimeEndpoint ep = (RuntimeEndpoint) assemblyFactory.createEndpoint(); + Component comp = assemblyFactory.createComponent(); + ep.setComponent(comp); + ep.setService(assemblyFactory.createComponentService()); + ep.getService().setInterfaceContract(getIC()); + Binding b = scaBindingFactory.createSCABinding(); + ep.setBinding(b); + ep.setURI(uri); + return ep; + } + + private InterfaceContract getIC() { + InterfaceContract ic = new JavaInterfaceContract(){ + + public Object clone() throws CloneNotSupportedException { + return null; + } + @Override + public Interface getInterface() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setInterface(Interface callInterface) { + // TODO Auto-generated method stub + + } + + @Override + public Interface getCallbackInterface() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setCallbackInterface(Interface callbackInterface) { + // TODO Auto-generated method stub + + } + + @Override + public InterfaceContract makeUnidirectional(boolean isCallback) { + // TODO Auto-generated method stub + return null; + } + + @Override + public InterfaceContract getNormalizedWSDLContract() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void setNormailizedWSDLContract(InterfaceContract wsdlInterfaceContract) { + // TODO Auto-generated method stub + + }}; + return ic; + } + +} diff --git a/sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/registry/hazelcast/RegistryTestCase.java b/sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/registry/hazelcast/RegistryTestCase.java new file mode 100644 index 0000000000..934c77484e --- /dev/null +++ b/sca-java-2.x/trunk/modules/domain-hazelcast/src/test/java/org/apache/tuscany/sca/registry/hazelcast/RegistryTestCase.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.registry.hazelcast; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; + +import junit.framework.Assert; + +import org.junit.Ignore; +import org.junit.Test; + +import com.hazelcast.config.Config; +import com.hazelcast.config.TcpIpConfig; +import com.hazelcast.config.XmlConfigBuilder; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IMap; +import com.hazelcast.nio.Address; + +@Ignore +public class RegistryTestCase { + + @Test + public void test1() throws UnknownHostException { + + HazelcastInstance h1 = create("54327", 9001); + + IMap h1map = h1.getMap("mymap"); + h1map.put("key1", "bla1"); + Assert.assertEquals("bla1", h1map.get("key1")); + + HazelcastInstance h2 = create("false", 9002, 9001); + IMap h2map = h2.getMap("mymap"); + Assert.assertEquals("bla1", h2map.get("key1")); + + HazelcastInstance h3 = create("false", 9003, 9002); + IMap h3map = h3.getMap("mymap"); + Assert.assertEquals("bla1", h3map.get("key1")); + + h3map.put("k3", "v3"); + h2map.put("k2", "v2"); + + Assert.assertEquals("v2", h1map.get("k2")); + Assert.assertEquals("v3", h1map.get("k3")); + Assert.assertEquals("v2", h2map.get("k2")); + Assert.assertEquals("v3", h2map.get("k3")); + Assert.assertEquals("v2", h3map.get("k2")); + Assert.assertEquals("v3", h3map.get("k3")); + + HazelcastInstance h4 = create("54328", 9004, 9001); + IMap h4map = h4.getMap("mymap"); +// Assert.assertNull(h4map.get("k2")); +// Assert.assertNull(h4map.get("k3")); + Assert.assertEquals("v2", h4map.get("k2")); + Assert.assertEquals("v3", h4map.get("k3")); + +// HazelcastInstance h5 = create("false", 9005, 9003, 9004); + HazelcastInstance h5 = create("54328", 9005); + +// Assert.assertEquals("v2", h4map.get("k2")); +// Assert.assertEquals("v3", h4map.get("k3")); + + IMap h5map = h5.getMap("mymap"); + Assert.assertEquals("v2", h5map.get("k2")); + Assert.assertEquals("v3", h5map.get("k3")); + + h1.shutdown(); + + Assert.assertEquals("v2", h2map.get("k2")); + Assert.assertEquals("v3", h2map.get("k3")); + Assert.assertEquals("v2", h3map.get("k2")); + Assert.assertEquals("v3", h3map.get("k3")); + Assert.assertEquals("v2", h4map.get("k2")); + Assert.assertEquals("v3", h4map.get("k3")); + + h3map.put("key1a", "bla1a"); + + Assert.assertEquals("bla1a", h2map.get("key1a")); + Assert.assertEquals("bla1a", h3map.get("key1a")); + Assert.assertEquals("bla1a", h4map.get("key1a")); + +// HazelcastInstance h4 = create(true, 9004, 9003); +// HazelcastInstance h5 = create(true, 9005); +// IMap h5map = h5.getMap("mymap"); +// Assert.assertEquals("bla1", h5map.get("key1")); + +// HazelcastInstance h6 = create(false, 9006, 9005); +// IMap h6map = h6.getMap("mymap"); +// Assert.assertEquals("bla1", h6map.get("key1")); + + } + + private HazelcastInstance create(String multicast, int listenPort, int... connectPorts) throws UnknownHostException { + Config config = new XmlConfigBuilder().build(); + config.setPort(listenPort); + config.setPortAutoIncrement(false); + + // declare the interface Hazelcast should bind to + config.getNetworkConfig().getInterfaces().clear(); + config.getNetworkConfig().getInterfaces().addInterface(InetAddress.getLocalHost().getHostAddress()); + config.getNetworkConfig().getInterfaces().setEnabled(true); + + if ("false".equals(multicast)) { + config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false); + } else { + config.getNetworkConfig().getJoin().getMulticastConfig().setMulticastPort(Integer.parseInt(multicast)); + } + + if (connectPorts.length > 0) { + TcpIpConfig tcpconfig = config.getNetworkConfig().getJoin().getTcpIpConfig(); + tcpconfig.setEnabled(true); + + List
lsMembers = tcpconfig.getAddresses(); + lsMembers.clear(); + for (int p : connectPorts) { + lsMembers.add(new Address(InetAddress.getLocalHost(), p)); + } + } + + return Hazelcast.newHazelcastInstance(config); + } + +} -- cgit v1.2.3