From bdd0a41aed7edf21ec2a65cfa17a86af2ef8c48a Mon Sep 17 00:00:00 2001 From: dims Date: Tue, 17 Jun 2008 00:23:01 +0000 Subject: Move Tuscany from Incubator to top level. git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@668359 13f79535-47bb-0310-9956-ffa450edef68 --- .../discovery/jxta/JxtaDiscoveryService.java | 320 +++++++++++++++++++++ .../service/discovery/jxta/JxtaException.java | 48 ++++ .../service/discovery/jxta/pdp/PeerListener.java | 125 ++++++++ .../discovery/jxta/prp/TuscanyQueryHandler.java | 122 ++++++++ 4 files changed, 615 insertions(+) create mode 100644 sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/JxtaDiscoveryService.java create mode 100644 sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/JxtaException.java create mode 100644 sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/pdp/PeerListener.java create mode 100644 sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/prp/TuscanyQueryHandler.java (limited to 'sandbox/old/contrib/discovery/jxta/src/main/java/org') diff --git a/sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/JxtaDiscoveryService.java b/sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/JxtaDiscoveryService.java new file mode 100644 index 0000000000..ed3cf60fee --- /dev/null +++ b/sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/JxtaDiscoveryService.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.discovery.jxta; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.security.cert.CertificateException; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; + +import net.jxta.credential.AuthenticationCredential; +import net.jxta.discovery.DiscoveryService; +import net.jxta.exception.PeerGroupException; +import net.jxta.impl.id.UUID.UUID; +import net.jxta.impl.protocol.ResolverQuery; +import net.jxta.membership.Authenticator; +import net.jxta.membership.MembershipService; +import net.jxta.peer.PeerID; +import net.jxta.peergroup.NetPeerGroupFactory; +import net.jxta.peergroup.PeerGroup; +import net.jxta.platform.NetworkConfigurator; +import net.jxta.protocol.ModuleImplAdvertisement; +import net.jxta.resolver.QueryHandler; +import net.jxta.resolver.ResolverService; + +import org.apache.tuscany.service.discovery.jxta.pdp.PeerListener; +import org.apache.tuscany.service.discovery.jxta.prp.TuscanyQueryHandler; +import org.apache.tuscany.spi.services.discovery.AbstractDiscoveryService; +import org.apache.tuscany.spi.services.discovery.DiscoveryException; +import org.apache.tuscany.spi.services.work.NotificationListener; +import org.apache.tuscany.spi.services.work.NotificationListenerAdaptor; +import org.apache.tuscany.spi.services.work.WorkScheduler; +import org.apache.tuscany.spi.util.stax.StaxUtil; +import org.omg.CORBA.Any; +import org.osoa.sca.annotations.Property; +import org.osoa.sca.annotations.Reference; + +/** + * Discovery service implemented using Apple bonjour. + * + * @version $Revision$ $Date$ + * + */ +public class JxtaDiscoveryService extends AbstractDiscoveryService { + + /** Well known peer group id. */ + private static final TuscanyPeerGroupID PEER_GROUP_ID = + new TuscanyPeerGroupID(new UUID("aea468a4-6450-47dc-a288-a7f1bbcc5927")); + + /** Default discovery interval. */ + private static long DEFAULT_INTERVAL = 10000L; + + /** Peer listener. */ + private PeerListener peerListener; + + /** Resolver service. */ + private ResolverService resolverService; + + /** Domain group. */ + private PeerGroup domainGroup; + + /** Network platform configurator. */ + private NetworkConfigurator configurator; + + /** Work scheduler. */ + private WorkScheduler workScheduler; + + /** Interval for sending discivery messages .*/ + private long interval = DEFAULT_INTERVAL; + + /** Started flag. */ + private final AtomicBoolean started = new AtomicBoolean(); + + /** Message id generator. */ + private final AtomicInteger messageIdGenerator = new AtomicInteger(); + + /** + * Adds a network configurator for this service. + * @param configurator Network configurator. + */ + @Reference + public void setConfigurator(NetworkConfigurator configurator) { + this.configurator = configurator; + } + + /** + * Adds a work scheduler for runningbackground discovery operations. + * @param workScheduler Work scheduler. + */ + @Reference + public void setWorkScheduler(WorkScheduler workScheduler) { + this.workScheduler = workScheduler; + } + + /** + * Sets the interval at which discovery messages are sent. + * @param interval Interval at which discovery messages are sent. + */ + @Property + public void setInterval(long interval) { + this.interval = interval; + } + + /** + * Starts the discovery service. + * @throws Any unexpected JXTA exception to bubble up the call stack. + */ + @Override + public void onStart() throws DiscoveryException { + + Runnable runnable = new Runnable() { + public void run() { + try { + startService(); + } catch(DiscoveryException ex) { + throw new JxtaException(ex); + } + } + }; + + NotificationListener listener = new NotificationListenerAdaptor(); + workScheduler.scheduleWork(runnable, listener); + + } + + /** + * Rusn the discovery service in a different thread. + */ + private void startService() throws DiscoveryException { + + try { + + configure(); + createAndJoinDomainGroup(); + + setupDiscovery(); + setupResolver(); + + started.set(true); + peerListener.start(); + + } catch (PeerGroupException ex) { + throw new DiscoveryException(ex); + } catch (IOException ex) { + throw new DiscoveryException(ex); + } catch (Exception ex) { + throw new DiscoveryException(ex); + } + + } + + /** + * Sends a message to the specified runtime. + * + * @param runtimeId Runtime id of recipient. If null, the message is + * broadcasted to all runtimes in the domain. + * @param content Message content. + * @return The message id. + * @throws DiscoveryException In case of discovery errors. + */ + public int sendMessage(final String runtimeId, final XMLStreamReader content) throws DiscoveryException { + + if(content == null) { + throw new IllegalArgumentException("Content id is null"); + } + + PeerID peerID = null; + if(runtimeId != null) { + peerID = peerListener.getPeerId(runtimeId); + if(peerID == null) { + throw new DiscoveryException("Unrecognized runtime " + runtimeId); + } + } + + String message = null; + try { + StaxUtil.serialize(content); + } catch(XMLStreamException ex) { + throw new DiscoveryException(ex); + } + + int messageId = messageIdGenerator.incrementAndGet(); + + ResolverQuery query = new ResolverQuery(); + query.setHandlerName(TuscanyQueryHandler.class.getSimpleName()); + query.setQuery(message); + query.setSrc(domainGroup.getPeerID().toString()); + + if(peerID == null) { + resolverService.sendQuery(null, query); + } else { + resolverService.sendQuery(peerID.toString(), query); + } + + return messageId; + + } + + /** + * Checks whether the service is started. + * @return True if the service is started. + */ + public boolean isStarted() { + return started.get(); + } + + /** + * Stops the discovery service. + */ + @Override + protected void onStop() { + peerListener.stop(); + started.set(false); + } + + /** + * Configures the platform. + * + */ + private void configure() throws DiscoveryException { + + try { + + String runtimeId = getRuntimeInfo().getRuntimeId(); + + configurator.setName(runtimeId); + configurator.setHome(new File(runtimeId)); + + if (configurator.exists()) { + File pc = new File(configurator.getHome(), "PlatformConfig"); + configurator.load(pc.toURI()); + configurator.save(); + } else { + configurator.save(); + } + + } catch (IOException ex) { + throw new DiscoveryException(ex); + } catch (CertificateException ex) { + throw new DiscoveryException(ex); + } + + } + + /** + * Creates and joins the domain peer group. + * @throws Exception In case of unexpected JXTA exceptions. + */ + private void createAndJoinDomainGroup() throws Exception { + + String domain = getRuntimeInfo().getDomain().toString(); + + PeerGroup netGroup = new NetPeerGroupFactory().getInterface(); + ModuleImplAdvertisement implAdv = netGroup.getAllPurposePeerGroupImplAdvertisement(); + domainGroup = netGroup.newGroup(PEER_GROUP_ID, implAdv, domain, "Tuscany domain group"); + + AuthenticationCredential authCred = new AuthenticationCredential(domainGroup, null, null); + MembershipService membership = domainGroup.getMembershipService(); + Authenticator auth = membership.apply(authCred); + + if (auth.isReadyForJoin()){ + membership.join(auth); + } else { + throw new DiscoveryException("Unable to join domain group"); + } + + } + + /** + * Sets up the resolver service. + */ + private void setupResolver() { + + resolverService = domainGroup.getResolverService(); + QueryHandler queryHandler = new TuscanyQueryHandler(resolverService, this); + resolverService.registerHandler(TuscanyQueryHandler.class.getSimpleName(), queryHandler); + + } + + /** + * Sets up peer discovery service. + */ + private void setupDiscovery() { + + final DiscoveryService discoveryService = domainGroup.getDiscoveryService(); + discoveryService.remotePublish(domainGroup.getPeerAdvertisement()); + peerListener = new PeerListener(discoveryService, interval, getRuntimeInfo().getRuntimeId()); + + } + + /* + * Well known peer grroup. + */ + @SuppressWarnings("serial") + private static class TuscanyPeerGroupID extends net.jxta.impl.id.CBID.PeerGroupID { + public TuscanyPeerGroupID(UUID uuid) { + super(uuid); + } + } + +} diff --git a/sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/JxtaException.java b/sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/JxtaException.java new file mode 100644 index 0000000000..06d542c705 --- /dev/null +++ b/sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/JxtaException.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.discovery.jxta; + +import org.apache.tuscany.api.TuscanyRuntimeException; + +/** + * Exception that translates unexpected JXTA exceptions to tuscany runtime exceptions. + * + * @version $Revision$ $Date$ + * + */ +@SuppressWarnings("serial") +public class JxtaException extends TuscanyRuntimeException { + + /** + * Initializes the error message. * + * @param message Initializes the error message. + */ + public JxtaException(String message) { + super(message); + } + + /** + * Initializes the root cause. + * @param cause Initializes the root cause. + */ + public JxtaException(Throwable cause) { + super(cause); + } + +} diff --git a/sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/pdp/PeerListener.java b/sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/pdp/PeerListener.java new file mode 100644 index 0000000000..adc67ce263 --- /dev/null +++ b/sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/pdp/PeerListener.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.discovery.jxta.pdp; + +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.tuscany.service.discovery.jxta.JxtaException; + +import net.jxta.discovery.DiscoveryEvent; +import net.jxta.discovery.DiscoveryListener; +import net.jxta.discovery.DiscoveryService; +import net.jxta.peer.PeerID; +import net.jxta.protocol.DiscoveryResponseMsg; +import net.jxta.protocol.PeerAdvertisement; + +/** + * Listener that keeps track of peers in the same peer group. + * + * @version $Revision$ $Date$ + * + */ +public class PeerListener implements DiscoveryListener { + + /** Discovery service to use. */ + private DiscoveryService discoveryService; + + /** Interval for sending discivery messages. */ + private long interval; + + /** Liveness indicator. */ + private AtomicBoolean live = new AtomicBoolean(); + + /** Owning runtime. */ + private String runtimeId; + + /** Available peers. */ + private Map availablePeers = new HashMap(); + + /** + * Initializes the JXTA discovery service. + * @param discoveryService JXTA discovery service. + * @param interval Interval between sending discovery messages. + * @param runtimeId Runtime that owns this peer. + */ + public PeerListener(DiscoveryService discoveryService, long interval, String runtimeId) { + this.discoveryService = discoveryService; + this.interval = interval; + this.runtimeId = runtimeId; + } + + /** + * Sends discovery messages for peer advertisements. + */ + public void start() { + + live.set(true); + discoveryService.addDiscoveryListener(this); + while(live.get()) { + discoveryService.getRemoteAdvertisements(null, DiscoveryService.PEER, null, null, 5); + try { + Thread.sleep(interval); + } catch(InterruptedException ex) { + throw new JxtaException(ex); + } + + } + + } + + /** + * returns the peer id for the runtime id. + * @param runtimeId Runtime id for which peer id is requested. + * @return Peer id. + */ + public synchronized PeerID getPeerId(String runtimeId) { + return availablePeers.get(runtimeId); + } + + /** + * Listens for discovery event. + */ + public synchronized void discoveryEvent(DiscoveryEvent event) { + + DiscoveryResponseMsg res = event.getResponse(); + Enumeration en = res.getAdvertisements(); + if (en != null ) { + while (en.hasMoreElements()) { + PeerAdvertisement adv = (PeerAdvertisement) en.nextElement(); + String peerName = adv.getName(); + if(!runtimeId.equals(peerName)) { + availablePeers.put(adv.getName(), adv.getPeerID()); + } + } + } + System.err.println("Peer view for " + runtimeId + ": " + availablePeers.keySet()); + + } + + /** + * Stops the pipe listener. + */ + public void stop() { + live.set(false); + } + +} diff --git a/sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/prp/TuscanyQueryHandler.java b/sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/prp/TuscanyQueryHandler.java new file mode 100644 index 0000000000..f1d4a2ae2f --- /dev/null +++ b/sandbox/old/contrib/discovery/jxta/src/main/java/org/apache/tuscany/service/discovery/jxta/prp/TuscanyQueryHandler.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.service.discovery.jxta.prp; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; + +import net.jxta.impl.protocol.ResolverResponse; +import net.jxta.protocol.ResolverQueryMsg; +import net.jxta.protocol.ResolverResponseMsg; +import net.jxta.resolver.QueryHandler; +import net.jxta.resolver.ResolverService; + +import org.apache.tuscany.service.discovery.jxta.JxtaDiscoveryService; +import org.apache.tuscany.service.discovery.jxta.JxtaException; +import org.apache.tuscany.spi.services.discovery.RequestListener; +import org.apache.tuscany.spi.services.discovery.ResponseListener; +import org.apache.tuscany.spi.util.stax.StaxUtil; + +/** + * Generic quety handler for tuscany PRP (Peer Resolver Protocol) messages. The + * processQuery method is invoked on the receiver and the + * processResponse is invoked on the sender when the receiver responds. + * @version $Revision$ $Date$ + * + */ +public class TuscanyQueryHandler implements QueryHandler { + + /** Resolver service for sending responses. */ + private final ResolverService resolverService; + + /** Discovery service. */ + private final JxtaDiscoveryService discoveryService; + + /** + * Initializes the JXTA resolver service and tuscany discovery service. + * + * @param resolverService Resolver service. + * @param discoveryService Tuscany discovery service. + */ + public TuscanyQueryHandler(final ResolverService resolverService, final JxtaDiscoveryService discoveryService) { + this.resolverService = resolverService; + this.discoveryService = discoveryService; + } + + /** + * Processes a query message. + */ + public int processQuery(ResolverQueryMsg queryMessage) { + + try { + + final String message = queryMessage.getQuery(); + final int queryId = queryMessage.getQueryId(); + final String source = queryMessage.getSrc(); + final String handler = queryMessage.getHandlerName(); + + final QName messageType = StaxUtil.getDocumentElementQName(message); + RequestListener messageListener = discoveryService.getRequestListener(messageType); + if(messageListener != null) { + + XMLStreamReader requestReader = StaxUtil.createReader(message); + XMLStreamReader responseReader = messageListener.onRequest(requestReader); + String response = StaxUtil.serialize(responseReader); + + ResolverResponse responseMessage = new ResolverResponse(); + responseMessage.setResponse(response); + responseMessage.setHandlerName(handler); + responseMessage.setQueryId(queryId); + + resolverService.sendResponse(source, responseMessage); + + } + return ResolverService.OK; + + } catch(XMLStreamException ex) { + throw new JxtaException(ex); + } + + } + + /** + * Processes a response message. + */ + public void processResponse(ResolverResponseMsg responseMessage) { + + try { + + final String message = responseMessage.getResponse(); + final int queryId = responseMessage.getQueryId(); + + final QName messageType = StaxUtil.getDocumentElementQName(message); + ResponseListener messageListener = discoveryService.getResponseListener(messageType); + if(messageListener != null) { + XMLStreamReader responseReader = StaxUtil.createReader(message); + messageListener.onResponse(responseReader, queryId); + } + + } catch(XMLStreamException ex) { + throw new JxtaException(ex); + } + + } + +} -- cgit v1.2.3