From cb17099e7fb5f31435e57e1ee2c60f85b06e0868 Mon Sep 17 00:00:00 2001 From: antelder Date: Thu, 31 Dec 2009 07:59:29 +0000 Subject: Have a look at using ehcache in an endpoint registry impl. In sandbaox for now to avoid yet another impl that doesnt work properly in trunk. git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@894790 13f79535-47bb-0310-9956-ffa450edef68 --- sandbox/endpoint-ehcache/META-INF/MANIFEST.MF | 28 + sandbox/endpoint-ehcache/pom.xml | 74 ++ .../TuscanyRMICacheManagerPeerListener.java | 602 +++++++++++++++++ .../TuscanyRMICacheManagerPeerListenerFactory.java | 113 ++++ ...org.apache.tuscany.sca.runtime.EndpointRegistry | 17 + .../sca/endpoint/tribes/RegistryTestCase.java | 193 ++++++ .../src/test/resources/ehcache.xml | 741 +++++++++++++++++++++ .../src/test/resources/ehcache1.xml | 46 ++ .../src/test/resources/ehcache2.xml | 48 ++ 9 files changed, 1862 insertions(+) create mode 100644 sandbox/endpoint-ehcache/META-INF/MANIFEST.MF create mode 100644 sandbox/endpoint-ehcache/pom.xml create mode 100644 sandbox/endpoint-ehcache/src/main/java/org/apache/tuscany/sca/endpoint/ehcache/TuscanyRMICacheManagerPeerListener.java create mode 100644 sandbox/endpoint-ehcache/src/main/java/org/apache/tuscany/sca/endpoint/ehcache/TuscanyRMICacheManagerPeerListenerFactory.java create mode 100644 sandbox/endpoint-ehcache/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry create mode 100644 sandbox/endpoint-ehcache/src/test/java/org/apache/tuscany/sca/endpoint/tribes/RegistryTestCase.java create mode 100644 sandbox/endpoint-ehcache/src/test/resources/ehcache.xml create mode 100644 sandbox/endpoint-ehcache/src/test/resources/ehcache1.xml create mode 100644 sandbox/endpoint-ehcache/src/test/resources/ehcache2.xml (limited to 'sandbox/endpoint-ehcache') diff --git a/sandbox/endpoint-ehcache/META-INF/MANIFEST.MF b/sandbox/endpoint-ehcache/META-INF/MANIFEST.MF new file mode 100644 index 0000000000..06df55ef38 --- /dev/null +++ b/sandbox/endpoint-ehcache/META-INF/MANIFEST.MF @@ -0,0 +1,28 @@ +Manifest-Version: 1.0 +Private-Package: org.apache.tuscany.sca.xsd.impl;version="2.0.0" +SCA-Version: 1.1 +Bundle-Name: Apache Tuscany SCA Tomcat Tribes Based EndPoint Registry +Bundle-Vendor: The Apache Software Foundation +Bundle-Version: 2.0.0 +Bundle-ManifestVersion: 2 +Bundle-License: http://www.apache.org/licenses/LICENSE-2.0.txt +Bundle-Description: Apache Tuscany SCA XSD Model +Bundle-SymbolicName: org.apache.tuscany.sca.endpoint.tribes +Bundle-DocURL: http://www.apache.org/ +Bundle-RequiredExecutionEnvironment: J2SE-1.5,JavaSE-1.6 +Import-Package: org.apache.catalina.tribes, + org.apache.catalina.tribes.group, + org.apache.catalina.tribes.group.interceptors, + org.apache.catalina.tribes.io, + org.apache.catalina.tribes.membership, + org.apache.catalina.tribes.tipis, + org.apache.catalina.tribes.transport, + org.apache.catalina.tribes.util, + org.apache.juli.logging;resolution:=optional, + org.apache.tuscany.sca.assembly;version="2.0.0", + org.apache.tuscany.sca.core;version="2.0.0", + org.apache.tuscany.sca.core.assembly.impl;scope=internal;version="2.0.0";resolution:=optional, + org.apache.tuscany.sca.management;version="2.0.0", + org.apache.tuscany.sca.policy;version="2.0.0", + org.apache.tuscany.sca.runtime;version="2.0.0" +Export-Package: org.apache.tuscany.sca.endpoint.tribes;version="2.0.0" diff --git a/sandbox/endpoint-ehcache/pom.xml b/sandbox/endpoint-ehcache/pom.xml new file mode 100644 index 0000000000..4582a09916 --- /dev/null +++ b/sandbox/endpoint-ehcache/pom.xml @@ -0,0 +1,74 @@ + + + + 4.0.0 + + org.apache.tuscany.sca + tuscany-modules + 2.0-SNAPSHOT + ../pom.xml + + tuscany-endpoint-echache + Apache Tuscany SCA EndPoint Registry using Ehcache + + + + sourceforge + http://oss.sonatype.org/content/groups/sourceforge/ + + true + + + + + + + net.sf.ehcache + ehcache-core + 1.7.0 + compile + + + org.apache.tuscany.sca + tuscany-core-spi + 2.0-SNAPSHOT + compile + + + org.apache.tuscany.sca + tuscany-core + 2.0-SNAPSHOT + test + + + org.apache.tuscany.sca + tuscany-deployment + 2.0-SNAPSHOT + test + + + org.apache.tuscany.sca + tuscany-implementation-java-runtime + 2.0-SNAPSHOT + test + + + + diff --git a/sandbox/endpoint-ehcache/src/main/java/org/apache/tuscany/sca/endpoint/ehcache/TuscanyRMICacheManagerPeerListener.java b/sandbox/endpoint-ehcache/src/main/java/org/apache/tuscany/sca/endpoint/ehcache/TuscanyRMICacheManagerPeerListener.java new file mode 100644 index 0000000000..35b19af5eb --- /dev/null +++ b/sandbox/endpoint-ehcache/src/main/java/org/apache/tuscany/sca/endpoint/ehcache/TuscanyRMICacheManagerPeerListener.java @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.ehcache; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.UnknownHostException; +import java.rmi.Naming; +import java.rmi.NotBoundException; +import java.rmi.Remote; +import java.rmi.RemoteException; +import java.rmi.registry.LocateRegistry; +import java.rmi.registry.Registry; +import java.rmi.server.ExportException; +import java.rmi.server.UnicastRemoteObject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import net.sf.ehcache.CacheException; +import net.sf.ehcache.CacheManager; +import net.sf.ehcache.Ehcache; +import net.sf.ehcache.Status; +import net.sf.ehcache.distribution.CacheManagerPeerListener; +import net.sf.ehcache.distribution.CacheReplicator; +import net.sf.ehcache.distribution.RMICacheManagerPeerListener; +import net.sf.ehcache.distribution.RMICachePeer; +import net.sf.ehcache.event.CacheEventListener; + +/** + * A cache server which exposes available cache operations remotely through RMI. + *

+ * It acts as a Decorator to a Cache. It holds an instance of cache, which is a local cache it talks to. + *

+ * This class could specify a security manager with code like: + *

+ * if (System.getSecurityManager() == null) {
+ *     System.setSecurityManager(new RMISecurityManager());
+ * }
+ * 
+ * Doing so would require the addition of grant statements in the java.policy file. + *

+ * Per the JDK documentation: "If no security manager is specified no class loading, by RMI clients or servers, is allowed, + * aside from what can be found in the local CLASSPATH." The classpath of each instance of this class should have + * all required classes to enable distribution, so no remote classloading is required or desirable. Accordingly, + * no security manager is set and there are no special JVM configuration requirements. + *

+ * This class opens a ServerSocket. The dispose method should be called for orderly closure of that socket. This class + * has a shutdown hook which calls dispose() as a convenience feature for developers. + * + * @author Greg Luck + * @version $Id: RMICacheManagerPeerListener.java 1012 2009-08-20 04:23:00Z gregluck $ + */ +public class TuscanyRMICacheManagerPeerListener implements CacheManagerPeerListener { + + private static final Logger LOG = Logger.getLogger(TuscanyRMICacheManagerPeerListener.class.getName()); + private static final int MINIMUM_SENSIBLE_TIMEOUT = 200; + private static final int NAMING_UNBIND_RETRY_INTERVAL = 400; + private static final int NAMING_UNBIND_MAX_RETRIES = 10; + + /** + * The cache peers. The value is an RMICachePeer. + */ + protected final Map cachePeers = new HashMap(); + + /** + * status. + */ + protected Status status; + + /** + * The RMI listener port + */ + protected Integer port; + + private Registry registry; + private boolean registryCreated; + private final String hostName; + + private CacheManager cacheManager; + private Integer socketTimeoutMillis; + private Integer remoteObjectPort; + + /** + * Constructor with full arguments. + * + * @param hostName may be null, in which case the hostName will be looked up. Machines with multiple + * interfaces should specify this if they do not want it to be the default NIC. + * @param port a port in the range 1025 - 65536 + * @param remoteObjectPort the port number on which the remote objects bound in the registry receive calls. + This defaults to a free port if not specified. + * @param cacheManager the CacheManager this listener belongs to + * @param socketTimeoutMillis TCP/IP Socket timeout when waiting on response + */ + public TuscanyRMICacheManagerPeerListener(String hostName, Integer port, Integer remoteObjectPort, CacheManager cacheManager, + Integer socketTimeoutMillis) throws UnknownHostException { + + status = Status.STATUS_UNINITIALISED; + + if (hostName != null && hostName.length() != 0) { + this.hostName = hostName; + if (hostName.equals("localhost")) { + LOG.log(Level.WARNING, "Explicitly setting the listener hostname to 'localhost' is not recommended. " + + "It will only work if all CacheManager peers are on the same machine."); + } + } else { + this.hostName = calculateHostAddress(); + } + if (port == null || port.intValue() == 0) { + assignFreePort(false); + } else { + this.port = port; + } + + //by default is 0, which is ok. + this.remoteObjectPort = remoteObjectPort; + + this.cacheManager = cacheManager; + if (socketTimeoutMillis == null || socketTimeoutMillis.intValue() < MINIMUM_SENSIBLE_TIMEOUT) { + throw new IllegalArgumentException("socketTimoutMillis must be a reasonable value greater than 200ms"); + } + this.socketTimeoutMillis = socketTimeoutMillis; + + } + + /** + * Assigns a free port to be the listener port. + * + * @throws IllegalStateException if the statis of the listener is not {@link net.sf.ehcache.Status#STATUS_UNINITIALISED} + */ + protected void assignFreePort(boolean forced) throws IllegalStateException { + if (status != Status.STATUS_UNINITIALISED) { + throw new IllegalStateException("Cannot change the port of an already started listener."); + } + this.port = new Integer(this.getFreePort()); + if (forced) { + LOG.log(Level.WARNING, "Resolving RMI port conflict by automatically using a free TCP/IP port to listen on: " + this.port); + } else { + LOG.log(Level.FINE, "Automatically finding a free TCP/IP port to listen on: " + this.port); + } + } + + + /** + * Calculates the host address as the default NICs IP address + * + * @throws UnknownHostException + */ + protected String calculateHostAddress() throws UnknownHostException { + return InetAddress.getLocalHost().getHostAddress(); + } + + + /** + * Gets a free server socket port. + * + * @return a number in the range 1025 - 65536 that was free at the time this method was executed + * @throws IllegalArgumentException + */ + protected int getFreePort() throws IllegalArgumentException { + ServerSocket serverSocket = null; + try { + serverSocket = new ServerSocket(0); + return serverSocket.getLocalPort(); + } catch (IOException e) { + throw new IllegalArgumentException("Could not acquire a free port number."); + } finally { + if (serverSocket != null && !serverSocket.isClosed()) { + try { + serverSocket.close(); + } catch (Exception e) { + LOG.log(Level.FINE, "Error closing ServerSocket: " + e.getMessage()); + } + } + } + } + + + /** + * {@inheritDoc} + */ + public void init() throws CacheException { + if (!status.equals(Status.STATUS_UNINITIALISED)) { + return; + } + RMICachePeer rmiCachePeer = null; + try { + startRegistry(); + int counter = 0; + populateListOfRemoteCachePeers(); + synchronized (cachePeers) { + for (Iterator iterator = cachePeers.values().iterator(); iterator.hasNext();) { + rmiCachePeer = (RMICachePeer) iterator.next(); + bind(rmiCachePeer.getUrl(), rmiCachePeer); + counter++; + } + } + LOG.log(Level.FINE, counter + " RMICachePeers bound in registry for RMI listener"); + status = Status.STATUS_ALIVE; + } catch (Exception e) { + String url = null; + if (rmiCachePeer != null) { + url = rmiCachePeer.getUrl(); + } + + throw new CacheException("Problem starting listener for RMICachePeer " + + url + ". Initial cause was " + e.getMessage(), e); + } + } + + /** + * Bind a cache peer + * + * @param rmiCachePeer + */ + protected void bind(String peerName, RMICachePeer rmiCachePeer) throws Exception { + Naming.rebind(peerName, rmiCachePeer); + } + + /** + * Returns a list of bound objects. + *

+ * This should match the list of cachePeers i.e. they should always be bound + * + * @return a list of String representations of RMICachePeer objects + */ + protected String[] listBoundRMICachePeers() throws CacheException { + try { + return registry.list(); + } catch (RemoteException e) { + throw new CacheException("Unable to list cache peers " + e.getMessage()); + } + } + + /** + * Returns a reference to the remote object. + * + * @param name the name of the cache e.g. sampleCache1 + */ + protected Remote lookupPeer(String name) throws CacheException { + try { + return registry.lookup(name); + } catch (Exception e) { + throw new CacheException("Unable to lookup peer for replicated cache " + name + " " + + e.getMessage()); + } + } + + /** + * Should be called on init because this is one of the last things that should happen on CacheManager startup. + */ + protected void populateListOfRemoteCachePeers() throws RemoteException { + String[] names = cacheManager.getCacheNames(); + for (int i = 0; i < names.length; i++) { + String name = names[i]; + Ehcache cache = cacheManager.getEhcache(name); + synchronized (cachePeers) { + if (cachePeers.get(name) == null) { + if (isDistributed(cache)) { + RMICachePeer peer = new RMICachePeer(cache, hostName, port, remoteObjectPort, socketTimeoutMillis); + cachePeers.put(name, peer); + } + } + } + } + + } + + /** + * Determine if the given cache is distributed. + * + * @param cache the cache to check + * @return true if a CacheReplicator is found in the listeners + */ + protected boolean isDistributed(Ehcache cache) { + Set listeners = cache.getCacheEventNotificationService().getCacheEventListeners(); + for (Iterator iterator = listeners.iterator(); iterator.hasNext();) { + CacheEventListener cacheEventListener = (CacheEventListener) iterator.next(); + if (cacheEventListener instanceof CacheReplicator) { + return true; + } + } + return false; + } + + /** + * Start the rmiregistry. + *

+ * The alternative is to use the rmiregistry binary, in which case: + *

    + *
  1. rmiregistry running + *
  2. -Djava.rmi.server.codebase="file:///Users/gluck/work/ehcache/build/classes/ file:///Users/gluck/work/ehcache/lib/commons-logging-1.0.4.jar" + *
+ * + * @throws RemoteException + */ + protected void startRegistry() throws RemoteException { + try { + registry = LocateRegistry.getRegistry(port.intValue()); + try { + //may not be created. Let's create it. + registry = LocateRegistry.createRegistry(port.intValue()); + registryCreated = true; + } catch (RemoteException e) { + registry.list(); + } + } catch (ExportException exception) { + LOG.log(Level.SEVERE, "Exception starting RMI registry. Error was " + exception.getMessage(), exception); + } + } + + /** + * Stop the rmiregistry if it was started by this class. + * + * @throws RemoteException + */ + protected void stopRegistry() throws RemoteException { + if (registryCreated) { + // the unexportObject call must be done on the Registry object returned + // by createRegistry not by getRegistry, a NoSuchObjectException is + // thrown otherwise + boolean success = UnicastRemoteObject.unexportObject(registry, true); + if (success) { + LOG.log(Level.FINE, "rmiregistry unexported."); + } else { + LOG.log(Level.WARNING, "Could not unexport rmiregistry."); + } + } + } + + /** + * Stop the listener. It + * + */ + public void dispose() throws CacheException { + if (!status.equals(Status.STATUS_ALIVE)) { + return; + } + try { + int counter = 0; + synchronized (cachePeers) { + for (Iterator iterator = cachePeers.values().iterator(); iterator.hasNext();) { + RMICachePeer rmiCachePeer = (RMICachePeer) iterator.next(); + disposeRMICachePeer(rmiCachePeer); + counter++; + } + stopRegistry(); + } + LOG.log(Level.FINE, counter + " RMICachePeers unbound from registry in RMI listener"); + status = Status.STATUS_SHUTDOWN; + } catch (Exception e) { + throw new CacheException("Problem unbinding remote cache peers. Initial cause was " + e.getMessage(), e); + } + } + + /** + * A template method to dispose an individual RMICachePeer. This consists of: + *
    + *
  1. Unbinding the peer from the naming service + *
  2. Unexporting the peer + *
+ * Override to specialise behaviour + * + * @param rmiCachePeer the cache peer to dispose of + * @throws Exception thrown if something goes wrong + */ + protected void disposeRMICachePeer(RMICachePeer rmiCachePeer) throws Exception { + unbind(rmiCachePeer); + } + + /** + * Unbinds an RMICachePeer and unexports it. + *

+ * We unbind from the registry first before unexporting. + * Unbinding first removes the very small possibility of a client + * getting the object from the registry while we are trying to unexport it. + *

+ * This method may take up to 4 seconds to complete, if we are having trouble + * unexporting the peer. + * + * @param rmiCachePeer the bound and exported cache peer + * @throws Exception + */ + protected void unbind(RMICachePeer rmiCachePeer) throws Exception { + String url = rmiCachePeer.getUrl(); + try { + Naming.unbind(url); + } catch (NotBoundException e) { + LOG.log(Level.WARNING, url + " not bound therefore not unbinding."); + } + // Try to gracefully unexport before forcing it. + boolean unexported = UnicastRemoteObject.unexportObject(rmiCachePeer, false); + for (int count = 1; (count < NAMING_UNBIND_MAX_RETRIES) && !unexported; count++) { + try { + Thread.sleep(NAMING_UNBIND_RETRY_INTERVAL); + } catch (InterruptedException ie) { + // break out of the unexportObject loop + break; + } + unexported = UnicastRemoteObject.unexportObject(rmiCachePeer, false); + } + + // If we still haven't been able to unexport, force the unexport + // as a last resort. + if (!unexported) { + if (!UnicastRemoteObject.unexportObject(rmiCachePeer, true)) { + LOG.log(Level.WARNING, "Unable to unexport rmiCachePeer: " + rmiCachePeer.getUrl() + ". Skipping."); + } + } + } + + /** + * All of the caches which are listening for remote changes. + * + * @return a list of RMICachePeer objects. The list if not live + */ + public List getBoundCachePeers() { + List cachePeerList = new ArrayList(); + synchronized (cachePeers) { + for (Iterator iterator = cachePeers.values().iterator(); iterator.hasNext();) { + RMICachePeer rmiCachePeer = (RMICachePeer) iterator.next(); + cachePeerList.add(rmiCachePeer); + } + } + return cachePeerList; + } + + /** + * Returns the listener status. + */ + public Status getStatus() { + return status; + } + + /** + * A listener will normally have a resource that only one instance can use at the same time, + * such as a port. This identifier is used to tell if it is unique and will not conflict with an + * existing instance using the resource. + * + * @return a String identifier for the resource + */ + public String getUniqueResourceIdentifier() { + return "RMI listener port: " + port; + } + + /** + * If a conflict is detected in unique resource use, this method signals the listener to attempt + * automatic resolution of the resource conflict. + * + * @throws IllegalStateException if the statis of the listener is not {@link net.sf.ehcache.Status#STATUS_UNINITIALISED} + */ + public void attemptResolutionOfUniqueResourceConflict() throws IllegalStateException, CacheException { + assignFreePort(true); + } + + /** + * The replication scheme this listener interacts with. + * Each peer provider has a scheme name, which can be used by caches to specify for replication and bootstrap purposes. + * + * @return the well-known scheme name, which is determined by the replication provider author. + */ + public String getScheme() { + return "RMI"; + } + + /** + * Called immediately after a cache has been added and activated. + *

+ * Note that the CacheManager calls this method from a synchronized method. Any attempt to call a synchronized + * method on CacheManager from this method will cause a deadlock. + *

+ * Note that activation will also cause a CacheEventListener status change notification from + * {@link net.sf.ehcache.Status#STATUS_UNINITIALISED} to {@link net.sf.ehcache.Status#STATUS_ALIVE}. Care should be + * taken on processing that notification because: + *

+ * The calling method will block until this method returns. + *

+ * Repopulates the list of cache peers and rebinds the list. + * This method should be called if a cache is dynamically added + * + * @param cacheName the name of the Cache the operation relates to + * @see net.sf.ehcache.event.CacheEventListener + */ + public void notifyCacheAdded(String cacheName) throws CacheException { + + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "Adding " + cacheName + " to RMI listener"); + } + + //Don't add if exists. + synchronized (cachePeers) { + if (cachePeers.get(cacheName) != null) { + return; + } + } + + Ehcache cache = cacheManager.getEhcache(cacheName); + if (isDistributed(cache)) { + RMICachePeer rmiCachePeer = null; + String url = null; + try { + rmiCachePeer = new RMICachePeer(cache, hostName, port, remoteObjectPort, socketTimeoutMillis); + url = rmiCachePeer.getUrl(); + bind(url, rmiCachePeer); + } catch (Exception e) { + throw new CacheException("Problem starting listener for RMICachePeer " + + url + ". Initial cause was " + e.getMessage(), e); + } + + synchronized (cachePeers) { + cachePeers.put(cacheName, rmiCachePeer); + } + + } + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, cachePeers.size() + " RMICachePeers bound in registry for RMI listener"); + } + } + + /** + * Called immediately after a cache has been disposed and removed. The calling method will block until + * this method returns. + *

+ * Note that the CacheManager calls this method from a synchronized method. Any attempt to call a synchronized + * method on CacheManager from this method will cause a deadlock. + *

+ * Note that a {@link net.sf.ehcache.event.CacheEventListener} status changed will also be triggered. Any attempt from that notification + * to access CacheManager will also result in a deadlock. + * + * @param cacheName the name of the Cache the operation relates to + */ + public void notifyCacheRemoved(String cacheName) { + + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "Removing " + cacheName + " from RMI listener"); + } + + //don't remove if already removed. + synchronized (cachePeers) { + if (cachePeers.get(cacheName) == null) { + return; + } + } + + RMICachePeer rmiCachePeer; + synchronized (cachePeers) { + rmiCachePeer = (RMICachePeer) cachePeers.remove(cacheName); + } + String url = null; + try { + unbind(rmiCachePeer); + } catch (Exception e) { + throw new CacheException("Error removing Cache Peer " + + url + " from listener. Message was: " + e.getMessage(), e); + } + + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, cachePeers.size() + " RMICachePeers bound in registry for RMI listener"); + } + } + + + /** + * Package local method for testing + */ + void addCachePeer(String name, RMICachePeer peer) { + synchronized (cachePeers) { + cachePeers.put(name, peer); + + } + } +} diff --git a/sandbox/endpoint-ehcache/src/main/java/org/apache/tuscany/sca/endpoint/ehcache/TuscanyRMICacheManagerPeerListenerFactory.java b/sandbox/endpoint-ehcache/src/main/java/org/apache/tuscany/sca/endpoint/ehcache/TuscanyRMICacheManagerPeerListenerFactory.java new file mode 100644 index 0000000000..98ea8603ec --- /dev/null +++ b/sandbox/endpoint-ehcache/src/main/java/org/apache/tuscany/sca/endpoint/ehcache/TuscanyRMICacheManagerPeerListenerFactory.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.ehcache; + +import java.net.UnknownHostException; +import java.util.Properties; + +import net.sf.ehcache.CacheException; +import net.sf.ehcache.CacheManager; +import net.sf.ehcache.distribution.CacheManagerPeerListener; +import net.sf.ehcache.distribution.CacheManagerPeerListenerFactory; +import net.sf.ehcache.distribution.RMICacheManagerPeerListener; +import net.sf.ehcache.util.PropertyUtil; + +/** + * Builds a listener based on RMI. + *

+ * Expected configuration line: + *

+ * + * <cachePeerListenerFactory class="net.sf.ehcache.distribution.RMICacheManagerPeerListenerFactory" + * properties="hostName=localhost, port=5000" /> + * + * + * @author Greg Luck + * @version $Id: RMICacheManagerPeerListenerFactory.java 1012 2009-08-20 04:23:00Z gregluck $ + */ +public class TuscanyRMICacheManagerPeerListenerFactory extends CacheManagerPeerListenerFactory { + + /** + * The default timeout for cache replication for a single replication action. + * This may need to be increased for large data transfers. + */ + public static final Integer DEFAULT_SOCKET_TIMEOUT_MILLIS = new Integer(120000); + + private static final String HOSTNAME = "hostName"; + private static final String PORT = "port"; + private static final String REMOTE_OBJECT_PORT = "remoteObjectPort"; + private static final String SOCKET_TIMEOUT_MILLIS = "socketTimeoutMillis"; + + /** + * @param properties implementation specific properties. These are configured as comma + * separated name value pairs in ehcache.xml + */ + public final CacheManagerPeerListener createCachePeerListener(CacheManager cacheManager, Properties properties) + throws CacheException { + String hostName = PropertyUtil.extractAndLogProperty(HOSTNAME, properties); + + String portString = PropertyUtil.extractAndLogProperty(PORT, properties); + Integer port = null; + if (portString != null && portString.length() != 0) { + port = new Integer(portString); + } else { + port = new Integer(0); + } + + //0 means any port in UnicastRemoteObject, so it is ok if not specified to make it 0 + String remoteObjectPortString = PropertyUtil.extractAndLogProperty(REMOTE_OBJECT_PORT, properties); + Integer remoteObjectPort = null; + if (remoteObjectPortString != null && remoteObjectPortString.length() != 0) { + remoteObjectPort = new Integer(remoteObjectPortString); + } else { + remoteObjectPort = new Integer(0); + } + + String socketTimeoutMillisString = PropertyUtil.extractAndLogProperty(SOCKET_TIMEOUT_MILLIS, properties); + Integer socketTimeoutMillis; + if (socketTimeoutMillisString == null || socketTimeoutMillisString.length() == 0) { + socketTimeoutMillis = DEFAULT_SOCKET_TIMEOUT_MILLIS; + } else { + socketTimeoutMillis = new Integer(socketTimeoutMillisString); + } + return doCreateCachePeerListener(hostName, port, remoteObjectPort, cacheManager, socketTimeoutMillis); + } + + /** + * A template method to actually create the factory + * + * @param hostName + * @param port + * @param remoteObjectPort + * @param cacheManager + * @param socketTimeoutMillis @return a crate CacheManagerPeerListener + */ + protected CacheManagerPeerListener doCreateCachePeerListener(String hostName, + Integer port, + Integer remoteObjectPort, + CacheManager cacheManager, + Integer socketTimeoutMillis) { + try { + return new TuscanyRMICacheManagerPeerListener(hostName, port, remoteObjectPort, cacheManager, socketTimeoutMillis); + } catch (UnknownHostException e) { + throw new CacheException("Unable to create CacheManagerPeerListener. Initial cause was " + e.getMessage(), e); + } + } +} diff --git a/sandbox/endpoint-ehcache/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry b/sandbox/endpoint-ehcache/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry new file mode 100644 index 0000000000..a16785beef --- /dev/null +++ b/sandbox/endpoint-ehcache/src/main/resources/META-INF/services/org.apache.tuscany.sca.runtime.EndpointRegistry @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +org.apache.tuscany.sca.endpoint.tribes.ReplicatedEndpointRegistry;ranking=150,address=228.0.0.100,port=50000,timeout=50,scheme=tribes diff --git a/sandbox/endpoint-ehcache/src/test/java/org/apache/tuscany/sca/endpoint/tribes/RegistryTestCase.java b/sandbox/endpoint-ehcache/src/test/java/org/apache/tuscany/sca/endpoint/tribes/RegistryTestCase.java new file mode 100644 index 0000000000..4053e9fa7d --- /dev/null +++ b/sandbox/endpoint-ehcache/src/test/java/org/apache/tuscany/sca/endpoint/tribes/RegistryTestCase.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.endpoint.tribes; + +import java.util.Properties; + +import junit.framework.Assert; +import net.sf.ehcache.Cache; +import net.sf.ehcache.CacheManager; +import net.sf.ehcache.Ehcache; +import net.sf.ehcache.Element; +import net.sf.ehcache.bootstrap.BootstrapCacheLoader; +import net.sf.ehcache.config.CacheConfiguration; +import net.sf.ehcache.config.Configuration; +import net.sf.ehcache.config.ConfigurationHelper; +import net.sf.ehcache.config.FactoryConfiguration; +import net.sf.ehcache.config.TerracottaConfiguration; +import net.sf.ehcache.distribution.CacheManagerPeerProvider; +import net.sf.ehcache.distribution.RMIBootstrapCacheLoaderFactory; +import net.sf.ehcache.distribution.RMICacheManagerPeerListener; +import net.sf.ehcache.distribution.RMICacheReplicatorFactory; +import net.sf.ehcache.event.CacheEventListener; +import net.sf.ehcache.event.RegisteredEventListeners; +import net.sf.ehcache.store.MemoryStoreEvictionPolicy; + +import org.junit.Test; + +public class RegistryTestCase { + + + @Test + public void foo() throws InterruptedException { + Cache cache2 = createCache2("ehcache2.xml"); +// Thread.sleep(4000); + Cache cache1 = createCache2("ehcache1.xml"); + cache1.put(new Element("key1", "bla1")); + Thread.sleep(400); + + Assert.assertEquals("bla1", cache2.get("key1").getObjectValue()); + cache2.put(new Element("k2", "foo2")); + Thread.sleep(400); + Assert.assertEquals("foo2", cache1.get("k2").getObjectValue()); + } + + private Cache createCache2(String file) { + CacheManager manager = new CacheManager("target/test-classes/" + file); + Cache test = manager.getCache("sampleDistributedCache1"); + + manager.addCache("foo"); + Cache test2 = manager.getCache("foo"); + + return test; + } + +// final Ehcache createCache(CacheConfiguration cacheConfiguration) { +// boolean terracottaClustered = false; +// String terracottaValueMode = null; +// boolean terracottaCoherentReads = true; +// TerracottaConfiguration tcConfiguration = cacheConfiguration.getTerracottaConfiguration(); +// if (tcConfiguration != null) { +// terracottaClustered = tcConfiguration.isClustered(); +// terracottaValueMode = tcConfiguration.getValueMode().name(); +// terracottaCoherentReads = tcConfiguration.getCoherentReads(); +// } +// +// Ehcache cache = new Cache(cacheConfiguration.name, +// cacheConfiguration.maxElementsInMemory, +// cacheConfiguration.memoryStoreEvictionPolicy, +// cacheConfiguration.overflowToDisk, +// getDiskStorePath(), +// cacheConfiguration.eternal, +// cacheConfiguration.timeToLiveSeconds, +// cacheConfiguration.timeToIdleSeconds, +// cacheConfiguration.diskPersistent, +// cacheConfiguration.diskExpiryThreadIntervalSeconds, +// null, +// null, +// cacheConfiguration.maxElementsOnDisk, +// cacheConfiguration.diskSpoolBufferSizeMB, +// cacheConfiguration.clearOnFlush, +// terracottaClustered, +// terracottaValueMode, +// terracottaCoherentReads); +// RegisteredEventListeners listeners = cache.getCacheEventNotificationService(); +// registerCacheListeners(cacheConfiguration, listeners); +// registerCacheExtensions(cacheConfiguration, cache); +// BootstrapCacheLoader bootstrapCacheLoader = createBootstrapCacheLoader( +// cacheConfiguration.getBootstrapCacheLoaderFactoryConfiguration()); +// cache.setBootstrapCacheLoader(bootstrapCacheLoader); +// registerCacheLoaders(cacheConfiguration, cache); +// cache = applyCacheExceptionHandler(cacheConfiguration, cache); +// return cache; +// } + +// private Ehcache create(CacheManager manager, String name) { +// ConfigurationHelper ch = new ConfigurationHelper(manager, null); +// CacheConfiguration cc = new CacheConfiguration(); +// cc. +// +// +// Ehcache cache = new Cache(name, +// 10000, +// MemoryStoreEvictionPolicy.LRU, +// false, +// "", +// true, +// 0, +// 0, +// false, +// 0, +// null, +// null, +// 0, +// 0, +// false, +// false, +// null, +// false); +// RegisteredEventListeners listeners = cache.getCacheEventNotificationService(); +// Properties properties = new Properties(); +// properties.put("asynchronousReplicationIntervalMillis", "100"); +// listeners.registerListener(new RMICacheReplicatorFactory().createCacheEventListener(properties)); +//// registerCacheListeners(cacheConfiguration, listeners); +//// registerCacheExtensions(cacheConfiguration, cache); +// cache.setBootstrapCacheLoader(new RMIBootstrapCacheLoaderFactory().createBootstrapCacheLoader(null)); +//// registerCacheLoaders(cacheConfiguration, cache); +// // cache = applyCacheExceptionHandler(cacheConfiguration, cache); +// return cache; +// } + + private Cache createCache(int port, boolean multicast, String remotes) { + Configuration config = new Configuration(); + + if (remotes != null && remotes.length() > 0) { + FactoryConfiguration factory = new FactoryConfiguration(); + factory.setClass("net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory"); + factory.setPropertySeparator(","); + factory.setProperties("peerDiscovery=manual,rmiUrls=//" + remotes); + config.addCacheManagerPeerProviderFactory(factory); + } + +// +//--> + + if (multicast) { + FactoryConfiguration factory = new FactoryConfiguration(); + factory.setClass("net.sf.ehcache.distribution.RMICacheManagerPeerProviderFactory"); + factory.setPropertySeparator(","); + factory.setProperties("peerDiscovery=automatic,multicastGroupAddress=230.0.0.1,multicastGroupPort=4446,timeToLive=1"); + config.addCacheManagerPeerProviderFactory(factory); + } + RMICacheManagerPeerListener xxx; + FactoryConfiguration factoryx = new FactoryConfiguration(); + factoryx.setClass("net.sf.ehcache.distribution.RMICacheManagerPeerListenerFactory"); + factoryx.setPropertySeparator(","); + factoryx.setProperties("hostname=192.168.0.101,port=" + port); + config.addCacheManagerPeerListenerFactory(factoryx); + + CacheConfiguration defaultCacheConfiguration = new CacheConfiguration(); + defaultCacheConfiguration.setDiskPersistent(false); + config.setDefaultCacheConfiguration(defaultCacheConfiguration); + CacheManager singletonManager = new CacheManager(config); + CacheManagerPeerProvider pp = singletonManager.getCacheManagerPeerProvider("RMI"); + Cache memoryOnlyCache = new Cache("testCache", 5000, false, false, 5, 2); +// CacheEventListener cacheEventListener = new CacheEventListener(); +// memoryOnlyCache.getCacheEventNotificationService().registerListener(cacheEventListener); + singletonManager.addCache(memoryOnlyCache); + Cache test = singletonManager.getCache("testCache"); + return test; + } + +} diff --git a/sandbox/endpoint-ehcache/src/test/resources/ehcache.xml b/sandbox/endpoint-ehcache/src/test/resources/ehcache.xml new file mode 100644 index 0000000000..5246c4a817 --- /dev/null +++ b/sandbox/endpoint-ehcache/src/test/resources/ehcache.xml @@ -0,0 +1,741 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/sandbox/endpoint-ehcache/src/test/resources/ehcache1.xml b/sandbox/endpoint-ehcache/src/test/resources/ehcache1.xml new file mode 100644 index 0000000000..6ee86bf8b8 --- /dev/null +++ b/sandbox/endpoint-ehcache/src/test/resources/ehcache1.xml @@ -0,0 +1,46 @@ + + + + + + + + + + + + + + + + + + + + diff --git a/sandbox/endpoint-ehcache/src/test/resources/ehcache2.xml b/sandbox/endpoint-ehcache/src/test/resources/ehcache2.xml new file mode 100644 index 0000000000..9ed08b2da8 --- /dev/null +++ b/sandbox/endpoint-ehcache/src/test/resources/ehcache2.xml @@ -0,0 +1,48 @@ + + + + + + + + + + + + + + + + + + + + + + -- cgit v1.2.3