From bdd0a41aed7edf21ec2a65cfa17a86af2ef8c48a Mon Sep 17 00:00:00 2001 From: dims Date: Tue, 17 Jun 2008 00:23:01 +0000 Subject: Move Tuscany from Incubator to top level. git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@668359 13f79535-47bb-0310-9956-ffa450edef68 --- .../notification/NotificationTypeManagerImpl.java | 692 +++++++++++++++++++++ 1 file changed, 692 insertions(+) create mode 100644 branches/sca-java-1.0.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java (limited to 'branches/sca-java-1.0.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java') diff --git a/branches/sca-java-1.0.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java b/branches/sca-java-1.0.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java new file mode 100644 index 0000000000..94689b4d10 --- /dev/null +++ b/branches/sca-java-1.0.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java @@ -0,0 +1,692 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; + +import org.apache.tuscany.sca.binding.notification.encoding.Broker; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerConsumerReference; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerID; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerProducerReference; +import org.apache.tuscany.sca.binding.notification.encoding.Brokers; +import org.apache.tuscany.sca.binding.notification.encoding.Constants; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingException; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingObject; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils; +import org.apache.tuscany.sca.binding.notification.encoding.EndConsumers; +import org.apache.tuscany.sca.binding.notification.encoding.EndProducers; +import org.apache.tuscany.sca.binding.notification.encoding.EndpointAddress; +import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference; +import org.apache.tuscany.sca.binding.notification.encoding.EndpointReferenceWrapper; +import org.apache.tuscany.sca.binding.notification.encoding.NeighborBrokerConsumers; +import org.apache.tuscany.sca.binding.notification.encoding.Neighbors; +import org.apache.tuscany.sca.binding.notification.encoding.NewBroker; +import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerAck; +import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerResponse; +import org.apache.tuscany.sca.binding.notification.encoding.NewConsumer; +import org.apache.tuscany.sca.binding.notification.encoding.NewConsumerResponse; +import org.apache.tuscany.sca.binding.notification.encoding.NewProducer; +import org.apache.tuscany.sca.binding.notification.encoding.NewProducerResponse; +import org.apache.tuscany.sca.binding.notification.encoding.RemoveBroker; +import org.apache.tuscany.sca.binding.notification.encoding.RemovedBroker; +import org.apache.tuscany.sca.binding.notification.encoding.ReplaceBrokerConnection; +import org.apache.tuscany.sca.binding.notification.util.IOUtils; +import org.apache.tuscany.sca.binding.notification.util.NotificationServlet; +import org.apache.tuscany.sca.binding.notification.util.URIUtil; +import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException; +import org.apache.tuscany.sca.binding.notification.util.IOUtils.ReadableContinuation; +import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable; +import org.apache.tuscany.sca.binding.notification.util.NotificationServlet.NotificationServletStreamHandler; +import org.apache.tuscany.sca.host.http.ServletHost; + +/** + * A notification type manager serves as a registry of producers and consumers, or brokers, for + * any notification type. This class implements an interface that allows a reference provider + * (a producer), a service provider (a consumer), or both (a broker, via the provider factory), + * to access locally the ntm for its notification type, regardless of whether the ntm resides + * locally or remotely. + * At a given host there is only one reference provider and/or one service provider for any given + * notification type. So, if the ntm for a notification type resides locally, then it is invoked + * exclusively by either a reference provider (newProducer), a service provider (newConsumer), or + * a provider factory (newBroker). And since these invocations occur when the providers are being + * created then all three of consumerLists, producerLists and brokerLists must be null when these + * invocations occur. + * + * @version $Rev$ $Date$ + */ +public class NotificationTypeManagerImpl implements NotificationTypeManager { + + private static final String ntmPathBase = "/ntm"; + + private ServletHost servletHost; + private EncodingRegistry encodingRegistry; + private Map ntmHandlers; + + public NotificationTypeManagerImpl() { + } + + public void setServletHost(ServletHost servletHost) { + this.servletHost = servletHost; + } + + public void setEncodingRegistry(EncodingRegistry encodingRegistry) { + this.encodingRegistry = encodingRegistry; + } + + public void init() { + ntmHandlers = new HashMap(); + } + + public String newConsumer(URI notificationType, URL consumerUrl, URL remoteNtmUrl, List producerListResult) { + if (ntmUrlIsRemote(consumerUrl, remoteNtmUrl)) { + try { + WriteableEPW wEPW = new WriteableEPW(new NewConsumer(), consumerUrl); + InputStreamDecoder isd = new InputStreamDecoder(); + NewConsumerResponse ncr = + (NewConsumerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_CONSUMER_OP, wEPW, isd); + String sequenceType = ncr.getSequenceType(); + if (Constants.EndProducers.equals(sequenceType) || Constants.BrokerProducers.equals(sequenceType)) { + for (EndpointReference epr : ncr.getReferenceSequence()) { + producerListResult.add(epr.getEndpointAddress().getAddress()); + } + } + return sequenceType; + } catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + else { + NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType); + if (ntmHandler != null) { + throw new RuntimeException("Trying to deploy local consumer with existing local producer, consumer or broker"); + } + + createNtmHandler(consumerUrl.getAuthority(), notificationType, consumerUrl, null, null); + + return Constants.NoProducers; + } + } + + private void createNtmHandler(String ntmUriAuthority, URI notificationType, URL consumerUrl, URL producerUrl, BrokerStruct broker) { + String ntmUri = "http://" + ntmUriAuthority + ntmPathBase + URIUtil.getPath(notificationType); + NotificationTypeManagerHandler ntmh = new NotificationTypeManagerHandler(notificationType, consumerUrl, producerUrl, broker); + ntmHandlers.put(notificationType, ntmh); + servletHost.addServletMapping(ntmUri, new NotificationServlet(ntmh)); + } + + public String newProducer(URI notificationType, URL producerUrl, URL remoteNtmUrl, List consumerListResult) { + if (ntmUrlIsRemote(producerUrl, remoteNtmUrl)) { + try { + WriteableEPW wEPW = new WriteableEPW(new NewProducer(), producerUrl); + InputStreamDecoder isd = new InputStreamDecoder(); + NewProducerResponse npr = + (NewProducerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_PRODUCER_OP, wEPW, isd); + String sequenceType = npr.getSequenceType(); + if (Constants.EndConsumers.equals(sequenceType) || Constants.BrokerConsumers.equals(sequenceType)) { + for (EndpointReference epr : npr.getReferenceSequence()) { + consumerListResult.add(epr.getEndpointAddress().getAddress()); + } + } + return sequenceType; + } catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + else { + NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType); + if (ntmHandler != null) { + throw new RuntimeException("Trying to deploy local producer with existing local producer, consumer or broker"); + } + + createNtmHandler(producerUrl.getAuthority(), notificationType, null, producerUrl, null); + + return Constants.NoConsumers; + } + } + + public boolean newBroker(URI notificationType, + URL consumerUrl, + URL producerUrl, + String brokerID, + URL remoteNtmUrl, + List consumerListResult, + List producerListResult) { + String ntmUriAuthority = producerUrl.getAuthority(); + if (!ntmUriAuthority.equals(consumerUrl.getAuthority())) { + throw new RuntimeException("Producer url and consumer url do not match"); + } + if (ntmUrlIsRemote(producerUrl, remoteNtmUrl)) { + try { + WriteableNewBroker wnb = new WriteableNewBroker(consumerUrl, producerUrl, brokerID); + InputStreamDecoder isd = new InputStreamDecoder(); + NewBrokerResponse nbr = + (NewBrokerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_BROKER_OP, wnb, isd); + if (nbr.isFirstBroker()) { + if (nbr.getEndConsumers().getSequenceType().equals(Constants.EndConsumers)) { + for (EndpointReference epr : nbr.getEndConsumers().getReferenceSequence()) { + consumerListResult.add(epr); + } + } + if (nbr.getEndProducers().getSequenceType().equals(Constants.EndProducers)) { + for (EndpointReference epr : nbr.getEndProducers().getReferenceSequence()) { + producerListResult.add(epr); + } + } + } + else { + for (Broker broker : nbr.getBrokers().getBrokerSequence()) { + consumerListResult.add(broker.getBrokerConsumerReference().getReference()); + producerListResult.add(broker.getBrokerProducerReference().getReference()); + } + } + return nbr.isFirstBroker(); + } catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + else { + NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType); + if (ntmHandler != null) { + throw new RuntimeException("Trying to deploy local broker with existing local producer, consumer or broker"); + } + + BrokerStruct broker = new BrokerStruct(consumerUrl, producerUrl, brokerID); + createNtmHandler(ntmUriAuthority, notificationType, null, null, broker); + + return true; + } + } + + private boolean ntmUrlIsRemote(URL localUrl, URL ntmUrl) { + if (ntmUrl == null) { + return false; + } + if (localUrl.getPort() != ntmUrl.getPort()) { + return true; + } + String remoteNtmUrlAuthority = ntmUrl.getAuthority(); + if (remoteNtmUrlAuthority.indexOf("localhost") >= 0) { + return false; + } + return !localUrl.getAuthority().equals(remoteNtmUrlAuthority); + } + + public void newBrokerAck(URL remoteNtmUrl) { + try { + IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_BROKER_ACK_OP, new WriteableNewBrokerAck(), null); + } catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public void removeBroker(EndpointReference brokerConsumerEpr, List neighborBrokerConsumerEprs, URL remoteNtmUrl) { + WriteableRemoveBroker wrb = new WriteableRemoveBroker(brokerConsumerEpr, neighborBrokerConsumerEprs); + + try { + IOUtils.sendHttpRequest(remoteNtmUrl, Constants.REMOVE_BROKER_OP, wrb, null); + } catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + private class NotificationTypeManagerHandler implements NotificationServletStreamHandler { + + private URI notificationType; + List consumerList; + List producerList; + List brokerList; + private NotificationTypeLock notificationTypeLock; + private BrokerStruct pendingBroker; + + public NotificationTypeManagerHandler(URI notificationType) { + this.notificationType = notificationType; + this.notificationTypeLock = new NotificationTypeLock(); + this.pendingBroker = null; + } + + public NotificationTypeManagerHandler(URI notificationType, URL consumerUrl, URL producerUrl, BrokerStruct broker) { + this(notificationType); + if (consumerUrl != null) { + addConsumer(consumerUrl); + } + else if (producerUrl != null) { + addProducer(producerUrl); + } + else if (broker != null) { + addBroker(broker); + } + } + + private void addConsumer(URL consumerUrl) { + if (consumerList == null) { + consumerList = new ArrayList(); + } + consumerList.add(consumerUrl); + } + + private void addProducer(URL producerUrl) { + if (producerList == null) { + producerList = new ArrayList(); + } + producerList.add(producerUrl); + } + + private void addBroker(BrokerStruct broker) { + if (brokerList == null) { + brokerList = new ArrayList(); + } + brokerList.add(broker); + } + + public void handle(Map headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream) { + String opHeader = headers.get(IOUtils.Notification_Operation); + EncodingObject eo = null; + try { + eo = EncodingUtils.decodeFromStream(encodingRegistry, istream); + } + catch(EncodingException e) { + throw new RuntimeException(e); + } + + if (Constants.NEW_CONSUMER_OP.equals(opHeader)) { + handleNewConsumer((NewConsumer)eo, ostream); + } + else if(Constants.NEW_PRODUCER_OP.equals(opHeader)) { + handleNewProducer((NewProducer)eo, ostream); + } + else if(Constants.NEW_BROKER_OP.equals(opHeader)) { + handleNewBroker((NewBroker)eo, ostream); + } + else if (Constants.NEW_BROKER_ACK_OP.equals(opHeader)) { + handleNewBrokerAck(); + } + else if (Constants.REMOVE_BROKER_OP.equals(opHeader)) { + handleRemoveBroker((RemoveBroker)eo); + } + } + + private void handleNewConsumer(NewConsumer nc, ServletOutputStream ostream) { + synchronized(notificationTypeLock) { + if (notificationTypeLock.isLocked) { + try { notificationTypeLock.wait(); } catch(InterruptedException e) {} + } + URL consumerUrl = nc.getReference().getEndpointAddress().getAddress(); + if (brokerList == null) { + addConsumer(consumerUrl); + } + + NewConsumerResponse ncr = new NewConsumerResponse(); + if (producerList != null) { + ncr.setSequenceType(Constants.EndProducers); + for (URL producerUrl : producerList) { + ncr.addReferenceToSequence(EncodingUtils.createEndpointReference(producerUrl, null)); + } + } + else if(brokerList != null) { + ncr.setSequenceType(Constants.BrokerProducers); + for (BrokerStruct broker : brokerList) { + ncr.addReferenceToSequence(EncodingUtils.createEndpointReference(broker.producerUrl, null)); + } + } + else { + ncr.setSequenceType(Constants.NoProducers); + } + try { + EncodingUtils.encodeToStream(encodingRegistry, ncr, ostream); + } + catch(IOUtilsException e) { + throw new RuntimeException(e); + } + } + } + + private void handleNewProducer(NewProducer np, ServletOutputStream ostream) { + synchronized(notificationTypeLock) { + if (notificationTypeLock.isLocked) { + try { notificationTypeLock.wait(); } catch(InterruptedException e) {} + } + URL producerUrl = np.getReference().getEndpointAddress().getAddress(); + if (brokerList == null) { + addProducer(producerUrl); + } + + NewProducerResponse npr = new NewProducerResponse(); + if (consumerList != null) { + npr.setSequenceType(Constants.EndConsumers); + for (URL consumerUrl : consumerList) { + npr.addReferenceToSequence(EncodingUtils.createEndpointReference(consumerUrl, null)); + } + } + else if(brokerList != null) { + npr.setSequenceType(Constants.BrokerConsumers); + for (BrokerStruct broker : brokerList) { + npr.addReferenceToSequence(EncodingUtils.createEndpointReference(broker.consumerUrl, null)); + } + } + else { + npr.setSequenceType(Constants.NoConsumers); + } + try { + EncodingUtils.encodeToStream(encodingRegistry, npr, ostream); + } + catch(IOUtilsException e) { + throw new RuntimeException(e); + } + } + } + + private void handleNewBroker(NewBroker nb, ServletOutputStream ostream) { + synchronized(notificationTypeLock) { + if (notificationTypeLock.isLocked) { + try { notificationTypeLock.wait(); } catch(InterruptedException e) {} + } + NewBrokerResponse nbr = new NewBrokerResponse(); + if (consumerList != null || producerList != null || brokerList == null) { + nbr.setFirstBroker(true); + EndConsumers endConsumers = new EndConsumers(); + if (consumerList != null) { + endConsumers.setSequenceType(Constants.EndConsumers); + for (URL consumerUrl : consumerList) { + endConsumers.addReferenceToSequence(EncodingUtils.createEndpointReference(consumerUrl, null)); + } + } + else { + endConsumers.setSequenceType(Constants.NoConsumers); + } + nbr.setEndConsumers(endConsumers); + EndProducers endProducers = new EndProducers(); + if (producerList != null) { + endProducers.setSequenceType(Constants.EndProducers); + for (URL producerUrl : producerList) { + endProducers.addReferenceToSequence(EncodingUtils.createEndpointReference(producerUrl, null)); + } + } + else { + endProducers.setSequenceType(Constants.NoProducers); + } + nbr.setEndProducers(endProducers); + } + else { + nbr.setFirstBroker(false); + Brokers brokers = new Brokers(); + for (BrokerStruct brokerStruct : brokerList) { + Broker brokerElt = new Broker(); + BrokerConsumerReference bcr = new BrokerConsumerReference(); + bcr.setReference(EncodingUtils.createEndpointReference(brokerStruct.consumerUrl, brokerStruct.brokerID)); + brokerElt.setBrokerConsumerReference(bcr); + + BrokerProducerReference bpr = new BrokerProducerReference(); + bpr.setReference(EncodingUtils.createEndpointReference(brokerStruct.producerUrl, brokerStruct.brokerID)); + brokerElt.setBrokerProducerReference(bpr); + brokers.addBrokerToSequence(brokerElt); + } + nbr.setBrokers(brokers); + } + EndpointReference consumerEPR = nb.getBrokerConsumerReference().getReference(); + URL consumerUrl = consumerEPR.getEndpointAddress().getAddress(); + BrokerID consumerBrokerID = consumerEPR.getReferenceProperties().getProperty(BrokerID.class); + EndpointReference producerEPR = nb.getBrokerProducerReference().getReference(); + URL producerUrl = producerEPR.getEndpointAddress().getAddress(); + BrokerID producerBrokerID = producerEPR.getReferenceProperties().getProperty(BrokerID.class); + if (consumerBrokerID == null || + producerBrokerID == null || + !consumerBrokerID.getID().equals(producerBrokerID.getID())) { + throw new RuntimeException("Producer and consumer broker ids do not match"); + } + // only add broker if consumerList == null && producerList == null + // otherwise, make it a pending broker and wait for ack + // TODO block for a configurable amount of time + BrokerStruct broker = new BrokerStruct(consumerUrl, producerUrl, consumerBrokerID.getID()); + if (consumerList == null && producerList == null) { + addBroker(broker); + } + else { + pendingBroker = broker; + notificationTypeLock.isLocked = true; + } + try { + EncodingUtils.encodeToStream(encodingRegistry, nbr, ostream); + } + catch(IOUtilsException e) { + throw new RuntimeException(e); + } + } + } + + private void handleNewBrokerAck() { + synchronized(notificationTypeLock) { + if (!notificationTypeLock.isLocked) { + notificationTypeLock.notifyAll(); + throw new RuntimeException("Notification type should be locked"); + } + if (brokerList != null) { + notificationTypeLock.isLocked = false; + notificationTypeLock.notifyAll(); + throw new RuntimeException("Can't add pending broker to non-empty broker list"); + } + if (pendingBroker == null) { + notificationTypeLock.isLocked = false; + notificationTypeLock.notifyAll(); + throw new RuntimeException("Missing pending broker"); + } + addBroker(pendingBroker); + consumerList = null; + producerList = null; + pendingBroker = null; + notificationTypeLock.isLocked = false; + notificationTypeLock.notifyAll(); + } + } + + private void handleRemoveBroker(RemoveBroker rb) { + synchronized(notificationTypeLock) { + if (notificationTypeLock.isLocked) { + try { notificationTypeLock.wait(); } catch(InterruptedException e) {} + } + + if (brokerList == null) { + throw new RuntimeException("No broker to remove for [" + notificationType + "]"); + } + + NeighborBrokerConsumers nbcs = rb.getNeighborBrokerConsumers(); + EndpointReference rbEpr = rb.getBrokerConsumerReference().getReference(); + if (nbcs != null && nbcs.getReferenceSequence() != null) { + List neighborBrokers = new ArrayList(); + for (EndpointReference neighborBrokerConsumerEpr : nbcs.getReferenceSequence()) { + BrokerStruct neighborBrokerStruct = null; + URL neighborBrokerConsumerEprUrl = neighborBrokerConsumerEpr.getEndpointAddress().getAddress(); + for (BrokerStruct brokerStruct : brokerList) { + if (brokerStruct.consumerUrl.equals(neighborBrokerConsumerEprUrl)) { + neighborBrokerStruct = brokerStruct; + break; + } + } + if (neighborBrokerStruct == null) { + throw new RuntimeException("Can't find neighbor broker for consumer EPR [" + + neighborBrokerConsumerEprUrl + "]"); + } + BrokerConsumerReference bcr = new BrokerConsumerReference(); + bcr.setReference(EncodingUtils.createEndpointReference(neighborBrokerStruct.consumerUrl, neighborBrokerStruct.brokerID)); + BrokerProducerReference bpr = new BrokerProducerReference(); + bpr.setReference(EncodingUtils.createEndpointReference(neighborBrokerStruct.producerUrl, neighborBrokerStruct.brokerID)); + Broker neighborBroker = new Broker(); + neighborBroker.setBrokerConsumerReference(bcr); + neighborBroker.setBrokerProducerReference(bpr); + neighborBrokers.add(neighborBroker); + } + int lastIndex = neighborBrokers.size() - 1; + for (int index = lastIndex; index >= 0; index--) { + List writeableNeighborBrokers = ((index > 0) ? neighborBrokers.subList(0, index) : null); + WriteableReplaceBrokerConnection wrbc = new WriteableReplaceBrokerConnection(rbEpr, writeableNeighborBrokers); + URL targetUrl = + neighborBrokers.get(index).getBrokerProducerReference().getReference().getEndpointAddress().getAddress(); + try { + IOUtils.sendHttpRequest(targetUrl, Constants.REPLACE_BROKER_CONNECTION_OP, wrbc, null); + } catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + + BrokerStruct removedBrokerStruct = null; + URL rbEprUrl = rbEpr.getEndpointAddress().getAddress(); + for (BrokerStruct brokerSruct : brokerList) { + if (brokerSruct.consumerUrl.equals(rbEprUrl)) { + removedBrokerStruct = brokerSruct; + break; + } + } + if (removedBrokerStruct == null) { + throw new RuntimeException("Can't find broker to remove for EPR [" + rbEprUrl + "]"); + } + if(!brokerList.remove(removedBrokerStruct)) { + throw new RuntimeException("Broker was not removed"); + } + } + } + } + + class NotificationTypeLock { + public boolean isLocked; + } + + class WriteableEPW implements Writeable { + private EndpointReferenceWrapper epw; + + public WriteableEPW(EndpointReferenceWrapper epw, URL url) { + EndpointAddress epa = new EndpointAddress(); + epa.setAddress(url); + EndpointReference epr = new EndpointReference(); + epr.setEndpointAddress(epa); + epw.setReference(epr); + this.epw = epw; + } + + public void write(OutputStream os) throws IOUtilsException { + EncodingUtils.encodeToStream(encodingRegistry, epw, os); + } + } + + class InputStreamDecoder implements ReadableContinuation { + + public Object read(InputStream istream) throws IOUtilsException { + try { + return EncodingUtils.decodeFromStream(encodingRegistry, istream); + } + catch(EncodingException e) { + throw new IOUtilsException(e); + } + } + } + + class BrokerStruct { + public URL consumerUrl; + public URL producerUrl; + public String brokerID; + + public BrokerStruct(URL consumerUrl, URL producerUrl, String brokerID) { + this.consumerUrl = consumerUrl; + this.producerUrl = producerUrl; + this.brokerID = brokerID; + } + } + + class WriteableNewBroker implements Writeable { + private NewBroker newBroker; + + public WriteableNewBroker(URL consumerUrl, URL producerUrl, String brokerID) { + newBroker = new NewBroker(); + BrokerConsumerReference bcr = new BrokerConsumerReference(); + bcr.setReference(EncodingUtils.createEndpointReference(consumerUrl, brokerID)); + newBroker.setBrokerConsumerReference(bcr); + + BrokerProducerReference bpr = new BrokerProducerReference(); + bpr.setReference(EncodingUtils.createEndpointReference(producerUrl, brokerID)); + newBroker.setBrokerProducerReference(bpr); + } + + public void write(OutputStream os) throws IOUtilsException { + EncodingUtils.encodeToStream(encodingRegistry, newBroker, os); + } + } + + class WriteableNewBrokerAck implements Writeable { + public void write(OutputStream os) throws IOUtilsException { + EncodingUtils.encodeToStream(encodingRegistry, new NewBrokerAck(), os); + } + } + + class WriteableRemoveBroker implements Writeable { + private RemoveBroker removeBroker; + + public WriteableRemoveBroker(EndpointReference brokerConsumerEpr, List neighborBrokerConsumerEprs) { + removeBroker = new RemoveBroker(); + BrokerConsumerReference brokerConsumerReference = new BrokerConsumerReference(); + brokerConsumerReference.setReference(brokerConsumerEpr); + removeBroker.setBrokerConsumerReference(brokerConsumerReference); + if (neighborBrokerConsumerEprs != null) { + NeighborBrokerConsumers neighborBrokerConsumers = new NeighborBrokerConsumers(); + neighborBrokerConsumers.setReferenceSequence(neighborBrokerConsumerEprs); + neighborBrokerConsumers.setSequenceType(Constants.BrokerConsumers); + removeBroker.setNeighborBrokerConsumers(neighborBrokerConsumers); + } + } + + public void write(OutputStream os) throws IOUtilsException { + EncodingUtils.encodeToStream(encodingRegistry, removeBroker, os); + } + } + + class WriteableReplaceBrokerConnection implements Writeable { + private ReplaceBrokerConnection replaceBrokerConnection; + + public WriteableReplaceBrokerConnection(EndpointReference removedBrokerEpr, List brokerSequence) { + replaceBrokerConnection = new ReplaceBrokerConnection(); + RemovedBroker removedBroker = new RemovedBroker(); + removedBroker.setReference(removedBrokerEpr); + replaceBrokerConnection.setRemovedBroker(removedBroker); + if (brokerSequence != null) { + Neighbors neighbors = new Neighbors(); + neighbors.setBrokerSequence(brokerSequence); + replaceBrokerConnection.setNeighbors(neighbors); + } + } + + public void write(OutputStream os) throws IOUtilsException { + EncodingUtils.encodeToStream(encodingRegistry, replaceBrokerConnection, os); + } + } +} -- cgit v1.2.3