summaryrefslogtreecommitdiffstats
path: root/sandbox/event/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'sandbox/event/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java')
-rw-r--r--sandbox/event/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java669
1 files changed, 0 insertions, 669 deletions
diff --git a/sandbox/event/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java b/sandbox/event/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java
deleted file mode 100644
index 527cfbd667..0000000000
--- a/sandbox/event/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java
+++ /dev/null
@@ -1,669 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tuscany.sca.binding.notification;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.servlet.ServletInputStream;
-import javax.servlet.ServletOutputStream;
-
-import org.apache.tuscany.sca.binding.notification.encoding.Broker;
-import org.apache.tuscany.sca.binding.notification.encoding.BrokerConsumerReference;
-import org.apache.tuscany.sca.binding.notification.encoding.BrokerID;
-import org.apache.tuscany.sca.binding.notification.encoding.BrokerProducerReference;
-import org.apache.tuscany.sca.binding.notification.encoding.Brokers;
-import org.apache.tuscany.sca.binding.notification.encoding.Constants;
-import org.apache.tuscany.sca.binding.notification.encoding.EncodingException;
-import org.apache.tuscany.sca.binding.notification.encoding.EncodingObject;
-import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry;
-import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils;
-import org.apache.tuscany.sca.binding.notification.encoding.EndConsumers;
-import org.apache.tuscany.sca.binding.notification.encoding.EndProducers;
-import org.apache.tuscany.sca.binding.notification.encoding.EndpointAddress;
-import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference;
-import org.apache.tuscany.sca.binding.notification.encoding.EndpointReferenceWrapper;
-import org.apache.tuscany.sca.binding.notification.encoding.NeighborBrokerConsumers;
-import org.apache.tuscany.sca.binding.notification.encoding.Neighbors;
-import org.apache.tuscany.sca.binding.notification.encoding.NewBroker;
-import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerAck;
-import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerResponse;
-import org.apache.tuscany.sca.binding.notification.encoding.NewConsumer;
-import org.apache.tuscany.sca.binding.notification.encoding.NewConsumerResponse;
-import org.apache.tuscany.sca.binding.notification.encoding.NewProducer;
-import org.apache.tuscany.sca.binding.notification.encoding.NewProducerResponse;
-import org.apache.tuscany.sca.binding.notification.encoding.RemoveBroker;
-import org.apache.tuscany.sca.binding.notification.encoding.RemovedBroker;
-import org.apache.tuscany.sca.binding.notification.encoding.ReplaceBrokerConnection;
-import org.apache.tuscany.sca.binding.notification.util.IOUtils;
-import org.apache.tuscany.sca.binding.notification.util.NotificationServlet;
-import org.apache.tuscany.sca.binding.notification.util.URIUtil;
-import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException;
-import org.apache.tuscany.sca.binding.notification.util.IOUtils.ReadableContinuation;
-import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable;
-import org.apache.tuscany.sca.binding.notification.util.NotificationServlet.NotificationServletStreamHandler;
-import org.apache.tuscany.sca.host.http.ServletHost;
-
-/**
- * A notification type manager serves as a registry of producers and consumers, or brokers, for
- * any notification type. This class implements an interface that allows a reference provider
- * (a producer), a service provider (a consumer), or both (a broker, via the provider factory),
- * to access locally the ntm for its notification type, regardless of whether the ntm resides
- * locally or remotely.
- * At a given host there is only one reference provider and/or one service provider for any given
- * notification type. So, if the ntm for a notification type resides locally, then it is invoked
- * exclusively by either a reference provider (newProducer), a service provider (newConsumer), or
- * a provider factory (newBroker). And since these invocations occur when the providers are being
- * created then all three of consumerLists, producerLists and brokerLists must be null when these
- * invocations occur.
- *
- * @version $Rev$ $Date$
- */
-public class NotificationTypeManagerImpl implements NotificationTypeManager {
-
- private static final String ntmPathBase = "/ntm";
-
- private ServletHost servletHost;
- private EncodingRegistry encodingRegistry;
- private Map<URI, NotificationTypeManagerHandler> ntmHandlers;
-
- public NotificationTypeManagerImpl() {
- }
-
- public void setServletHost(ServletHost servletHost) {
- this.servletHost = servletHost;
- }
-
- public void setEncodingRegistry(EncodingRegistry encodingRegistry) {
- this.encodingRegistry = encodingRegistry;
- }
-
- public void init() {
- ntmHandlers = new HashMap<URI, NotificationTypeManagerHandler>();
- }
-
- public String newConsumer(URI notificationType, URL consumerUrl, URL remoteNtmUrl, List<URL> producerListResult) {
- if (ntmUrlIsRemote(consumerUrl, remoteNtmUrl)) {
- try {
- WriteableEPW wEPW = new WriteableEPW(new NewConsumer(), consumerUrl);
- InputStreamDecoder isd = new InputStreamDecoder();
- NewConsumerResponse ncr =
- (NewConsumerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_CONSUMER_OP, wEPW, isd);
- String sequenceType = ncr.getSequenceType();
- if (Constants.EndProducers.equals(sequenceType) || Constants.BrokerProducers.equals(sequenceType)) {
- for (EndpointReference epr : ncr.getReferenceSequence()) {
- producerListResult.add(epr.getEndpointAddress().getAddress());
- }
- }
- return sequenceType;
- } catch(Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- } else {
- NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType);
- if (ntmHandler != null) {
- throw new RuntimeException("Trying to deploy local consumer with existing local producer, consumer or broker");
- }
-
- createNtmHandler(consumerUrl.getAuthority(), notificationType, consumerUrl, null, null);
-
- return Constants.NoProducers;
- }
- }
-
- private void createNtmHandler(String ntmUriAuthority, URI notificationType, URL consumerUrl, URL producerUrl, BrokerStruct broker) {
- String ntmUri = "http://" + ntmUriAuthority + ntmPathBase + URIUtil.getPath(notificationType);
- NotificationTypeManagerHandler ntmh = new NotificationTypeManagerHandler(notificationType, consumerUrl, producerUrl, broker);
- ntmHandlers.put(notificationType, ntmh);
- servletHost.addServletMapping(ntmUri, new NotificationServlet(ntmh));
- }
-
- public String newProducer(URI notificationType, URL producerUrl, URL remoteNtmUrl, List<URL> consumerListResult) {
- if (ntmUrlIsRemote(producerUrl, remoteNtmUrl)) {
- try {
- WriteableEPW wEPW = new WriteableEPW(new NewProducer(), producerUrl);
- InputStreamDecoder isd = new InputStreamDecoder();
- NewProducerResponse npr =
- (NewProducerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_PRODUCER_OP, wEPW, isd);
- String sequenceType = npr.getSequenceType();
- if (Constants.EndConsumers.equals(sequenceType) || Constants.BrokerConsumers.equals(sequenceType)) {
- for (EndpointReference epr : npr.getReferenceSequence()) {
- consumerListResult.add(epr.getEndpointAddress().getAddress());
- }
- }
- return sequenceType;
- } catch(Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- } else {
- NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType);
- if (ntmHandler != null) {
- throw new RuntimeException("Trying to deploy local producer with existing local producer, consumer or broker");
- }
-
- createNtmHandler(producerUrl.getAuthority(), notificationType, null, producerUrl, null);
-
- return Constants.NoConsumers;
- }
- }
-
- public boolean newBroker(URI notificationType,
- URL consumerUrl,
- URL producerUrl,
- String brokerID,
- URL remoteNtmUrl,
- List<EndpointReference> consumerListResult,
- List<EndpointReference> producerListResult) {
- String ntmUriAuthority = producerUrl.getAuthority();
- if (!ntmUriAuthority.equals(consumerUrl.getAuthority())) {
- throw new RuntimeException("Producer url and consumer url do not match");
- }
- if (ntmUrlIsRemote(producerUrl, remoteNtmUrl)) {
- try {
- WriteableNewBroker wnb = new WriteableNewBroker(consumerUrl, producerUrl, brokerID);
- InputStreamDecoder isd = new InputStreamDecoder();
- NewBrokerResponse nbr =
- (NewBrokerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_BROKER_OP, wnb, isd);
- if (nbr.isFirstBroker()) {
- if (nbr.getEndConsumers().getSequenceType().equals(Constants.EndConsumers)) {
- for (EndpointReference epr : nbr.getEndConsumers().getReferenceSequence()) {
- consumerListResult.add(epr);
- }
- }
- if (nbr.getEndProducers().getSequenceType().equals(Constants.EndProducers)) {
- for (EndpointReference epr : nbr.getEndProducers().getReferenceSequence()) {
- producerListResult.add(epr);
- }
- }
- } else {
- for (Broker broker : nbr.getBrokers().getBrokerSequence()) {
- consumerListResult.add(broker.getBrokerConsumerReference().getReference());
- producerListResult.add(broker.getBrokerProducerReference().getReference());
- }
- }
- return nbr.isFirstBroker();
- } catch(Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- } else {
- NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType);
- if (ntmHandler != null) {
- throw new RuntimeException("Trying to deploy local broker with existing local producer, consumer or broker");
- }
-
- BrokerStruct broker = new BrokerStruct(consumerUrl, producerUrl, brokerID);
- createNtmHandler(ntmUriAuthority, notificationType, null, null, broker);
-
- return true;
- }
- }
-
- private boolean ntmUrlIsRemote(URL localUrl, URL ntmUrl) {
- if (ntmUrl == null) {
- return false;
- }
- if (localUrl.getPort() != ntmUrl.getPort()) {
- return true;
- }
- String remoteNtmUrlAuthority = ntmUrl.getAuthority();
- if (remoteNtmUrlAuthority.indexOf("localhost") >= 0) {
- return false;
- }
- return !localUrl.getAuthority().equals(remoteNtmUrlAuthority);
- }
-
- public void newBrokerAck(URL remoteNtmUrl) {
- try {
- IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_BROKER_ACK_OP, new WriteableNewBrokerAck(), null);
- } catch(Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
- public void removeBroker(EndpointReference brokerConsumerEpr, List<EndpointReference> neighborBrokerConsumerEprs, URL remoteNtmUrl) {
- WriteableRemoveBroker wrb = new WriteableRemoveBroker(brokerConsumerEpr, neighborBrokerConsumerEprs);
-
- try {
- IOUtils.sendHttpRequest(remoteNtmUrl, Constants.REMOVE_BROKER_OP, wrb, null);
- } catch(Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
-
- private class NotificationTypeManagerHandler implements NotificationServletStreamHandler {
-
- private URI notificationType;
- List<URL> consumerList;
- List<URL> producerList;
- List<BrokerStruct> brokerList;
- private NotificationTypeLock notificationTypeLock;
- private BrokerStruct pendingBroker;
-
- public NotificationTypeManagerHandler(URI notificationType) {
- this.notificationType = notificationType;
- this.notificationTypeLock = new NotificationTypeLock();
- this.pendingBroker = null;
- }
-
- public NotificationTypeManagerHandler(URI notificationType, URL consumerUrl, URL producerUrl, BrokerStruct broker) {
- this(notificationType);
- if (consumerUrl != null) {
- addConsumer(consumerUrl);
- } else if (producerUrl != null) {
- addProducer(producerUrl);
- } else if (broker != null) {
- addBroker(broker);
- }
- }
-
- private void addConsumer(URL consumerUrl) {
- if (consumerList == null) {
- consumerList = new ArrayList<URL>();
- }
- consumerList.add(consumerUrl);
- }
-
- private void addProducer(URL producerUrl) {
- if (producerList == null) {
- producerList = new ArrayList<URL>();
- }
- producerList.add(producerUrl);
- }
-
- private void addBroker(BrokerStruct broker) {
- if (brokerList == null) {
- brokerList = new ArrayList<BrokerStruct>();
- }
- brokerList.add(broker);
- }
-
- public void handle(Map<String, String> headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream) {
- String opHeader = headers.get(IOUtils.Notification_Operation);
- EncodingObject eo = null;
- try {
- eo = EncodingUtils.decodeFromStream(encodingRegistry, istream);
- } catch(EncodingException e) {
- throw new RuntimeException(e);
- }
-
- if (Constants.NEW_CONSUMER_OP.equals(opHeader)) {
- handleNewConsumer((NewConsumer)eo, ostream);
- } else if(Constants.NEW_PRODUCER_OP.equals(opHeader)) {
- handleNewProducer((NewProducer)eo, ostream);
- } else if(Constants.NEW_BROKER_OP.equals(opHeader)) {
- handleNewBroker((NewBroker)eo, ostream);
- } else if (Constants.NEW_BROKER_ACK_OP.equals(opHeader)) {
- handleNewBrokerAck();
- } else if (Constants.REMOVE_BROKER_OP.equals(opHeader)) {
- handleRemoveBroker((RemoveBroker)eo);
- }
- }
-
- private void handleNewConsumer(NewConsumer nc, ServletOutputStream ostream) {
- synchronized(notificationTypeLock) {
- if (notificationTypeLock.isLocked) {
- try { notificationTypeLock.wait(); } catch(InterruptedException e) {}
- }
- URL consumerUrl = nc.getReference().getEndpointAddress().getAddress();
- if (brokerList == null) {
- addConsumer(consumerUrl);
- }
-
- NewConsumerResponse ncr = new NewConsumerResponse();
- if (producerList != null) {
- ncr.setSequenceType(Constants.EndProducers);
- for (URL producerUrl : producerList) {
- ncr.addReferenceToSequence(EncodingUtils.createEndpointReference(producerUrl, null));
- }
- } else if(brokerList != null) {
- ncr.setSequenceType(Constants.BrokerProducers);
- for (BrokerStruct broker : brokerList) {
- ncr.addReferenceToSequence(EncodingUtils.createEndpointReference(broker.producerUrl, null));
- }
- } else {
- ncr.setSequenceType(Constants.NoProducers);
- }
- try {
- EncodingUtils.encodeToStream(encodingRegistry, ncr, ostream);
- } catch(IOUtilsException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private void handleNewProducer(NewProducer np, ServletOutputStream ostream) {
- synchronized(notificationTypeLock) {
- if (notificationTypeLock.isLocked) {
- try { notificationTypeLock.wait(); } catch(InterruptedException e) {}
- }
- URL producerUrl = np.getReference().getEndpointAddress().getAddress();
- if (brokerList == null) {
- addProducer(producerUrl);
- }
-
- NewProducerResponse npr = new NewProducerResponse();
- if (consumerList != null) {
- npr.setSequenceType(Constants.EndConsumers);
- for (URL consumerUrl : consumerList) {
- npr.addReferenceToSequence(EncodingUtils.createEndpointReference(consumerUrl, null));
- }
- } else if(brokerList != null) {
- npr.setSequenceType(Constants.BrokerConsumers);
- for (BrokerStruct broker : brokerList) {
- npr.addReferenceToSequence(EncodingUtils.createEndpointReference(broker.consumerUrl, null));
- }
- } else {
- npr.setSequenceType(Constants.NoConsumers);
- }
- try {
- EncodingUtils.encodeToStream(encodingRegistry, npr, ostream);
- } catch(IOUtilsException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private void handleNewBroker(NewBroker nb, ServletOutputStream ostream) {
- synchronized(notificationTypeLock) {
- if (notificationTypeLock.isLocked) {
- try { notificationTypeLock.wait(); } catch(InterruptedException e) {}
- }
- NewBrokerResponse nbr = new NewBrokerResponse();
- if (consumerList != null || producerList != null || brokerList == null) {
- nbr.setFirstBroker(true);
- EndConsumers endConsumers = new EndConsumers();
- if (consumerList != null) {
- endConsumers.setSequenceType(Constants.EndConsumers);
- for (URL consumerUrl : consumerList) {
- endConsumers.addReferenceToSequence(EncodingUtils.createEndpointReference(consumerUrl, null));
- }
- } else {
- endConsumers.setSequenceType(Constants.NoConsumers);
- }
- nbr.setEndConsumers(endConsumers);
- EndProducers endProducers = new EndProducers();
- if (producerList != null) {
- endProducers.setSequenceType(Constants.EndProducers);
- for (URL producerUrl : producerList) {
- endProducers.addReferenceToSequence(EncodingUtils.createEndpointReference(producerUrl, null));
- }
- } else {
- endProducers.setSequenceType(Constants.NoProducers);
- }
- nbr.setEndProducers(endProducers);
- } else {
- nbr.setFirstBroker(false);
- Brokers brokers = new Brokers();
- for (BrokerStruct brokerStruct : brokerList) {
- Broker brokerElt = new Broker();
- BrokerConsumerReference bcr = new BrokerConsumerReference();
- bcr.setReference(EncodingUtils.createEndpointReference(brokerStruct.consumerUrl, brokerStruct.brokerID));
- brokerElt.setBrokerConsumerReference(bcr);
-
- BrokerProducerReference bpr = new BrokerProducerReference();
- bpr.setReference(EncodingUtils.createEndpointReference(brokerStruct.producerUrl, brokerStruct.brokerID));
- brokerElt.setBrokerProducerReference(bpr);
- brokers.addBrokerToSequence(brokerElt);
- }
- nbr.setBrokers(brokers);
- }
- EndpointReference consumerEPR = nb.getBrokerConsumerReference().getReference();
- URL consumerUrl = consumerEPR.getEndpointAddress().getAddress();
- BrokerID consumerBrokerID = consumerEPR.getReferenceProperties().getProperty(BrokerID.class);
- EndpointReference producerEPR = nb.getBrokerProducerReference().getReference();
- URL producerUrl = producerEPR.getEndpointAddress().getAddress();
- BrokerID producerBrokerID = producerEPR.getReferenceProperties().getProperty(BrokerID.class);
- if (consumerBrokerID == null ||
- producerBrokerID == null ||
- !consumerBrokerID.getID().equals(producerBrokerID.getID())) {
- throw new RuntimeException("Producer and consumer broker ids do not match");
- }
- // only add broker if consumerList == null && producerList == null
- // otherwise, make it a pending broker and wait for ack
- // TODO block for a configurable amount of time
- BrokerStruct broker = new BrokerStruct(consumerUrl, producerUrl, consumerBrokerID.getID());
- if (consumerList == null && producerList == null) {
- addBroker(broker);
- } else {
- pendingBroker = broker;
- notificationTypeLock.isLocked = true;
- }
- try {
- EncodingUtils.encodeToStream(encodingRegistry, nbr, ostream);
- } catch(IOUtilsException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private void handleNewBrokerAck() {
- synchronized(notificationTypeLock) {
- if (!notificationTypeLock.isLocked) {
- notificationTypeLock.notifyAll();
- throw new RuntimeException("Notification type should be locked");
- }
- if (brokerList != null) {
- notificationTypeLock.isLocked = false;
- notificationTypeLock.notifyAll();
- throw new RuntimeException("Can't add pending broker to non-empty broker list");
- }
- if (pendingBroker == null) {
- notificationTypeLock.isLocked = false;
- notificationTypeLock.notifyAll();
- throw new RuntimeException("Missing pending broker");
- }
- addBroker(pendingBroker);
- consumerList = null;
- producerList = null;
- pendingBroker = null;
- notificationTypeLock.isLocked = false;
- notificationTypeLock.notifyAll();
- }
- }
-
- private void handleRemoveBroker(RemoveBroker rb) {
- synchronized(notificationTypeLock) {
- if (notificationTypeLock.isLocked) {
- try { notificationTypeLock.wait(); } catch(InterruptedException e) {}
- }
-
- if (brokerList == null) {
- throw new RuntimeException("No broker to remove for [" + notificationType + "]");
- }
-
- NeighborBrokerConsumers nbcs = rb.getNeighborBrokerConsumers();
- EndpointReference rbEpr = rb.getBrokerConsumerReference().getReference();
- if (nbcs != null && nbcs.getReferenceSequence() != null) {
- List<Broker> neighborBrokers = new ArrayList<Broker>();
- for (EndpointReference neighborBrokerConsumerEpr : nbcs.getReferenceSequence()) {
- BrokerStruct neighborBrokerStruct = null;
- URL neighborBrokerConsumerEprUrl = neighborBrokerConsumerEpr.getEndpointAddress().getAddress();
- for (BrokerStruct brokerStruct : brokerList) {
- if (brokerStruct.consumerUrl.equals(neighborBrokerConsumerEprUrl)) {
- neighborBrokerStruct = brokerStruct;
- break;
- }
- }
- if (neighborBrokerStruct == null) {
- throw new RuntimeException("Can't find neighbor broker for consumer EPR [" +
- neighborBrokerConsumerEprUrl + "]");
- }
- BrokerConsumerReference bcr = new BrokerConsumerReference();
- bcr.setReference(EncodingUtils.createEndpointReference(neighborBrokerStruct.consumerUrl, neighborBrokerStruct.brokerID));
- BrokerProducerReference bpr = new BrokerProducerReference();
- bpr.setReference(EncodingUtils.createEndpointReference(neighborBrokerStruct.producerUrl, neighborBrokerStruct.brokerID));
- Broker neighborBroker = new Broker();
- neighborBroker.setBrokerConsumerReference(bcr);
- neighborBroker.setBrokerProducerReference(bpr);
- neighborBrokers.add(neighborBroker);
- }
- int lastIndex = neighborBrokers.size() - 1;
- for (int index = lastIndex; index >= 0; index--) {
- List<Broker> writeableNeighborBrokers = ((index > 0) ? neighborBrokers.subList(0, index) : null);
- WriteableReplaceBrokerConnection wrbc = new WriteableReplaceBrokerConnection(rbEpr, writeableNeighborBrokers);
- URL targetUrl =
- neighborBrokers.get(index).getBrokerProducerReference().getReference().getEndpointAddress().getAddress();
- try {
- IOUtils.sendHttpRequest(targetUrl, Constants.REPLACE_BROKER_CONNECTION_OP, wrbc, null);
- } catch(Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
- }
-
- BrokerStruct removedBrokerStruct = null;
- URL rbEprUrl = rbEpr.getEndpointAddress().getAddress();
- for (BrokerStruct brokerSruct : brokerList) {
- if (brokerSruct.consumerUrl.equals(rbEprUrl)) {
- removedBrokerStruct = brokerSruct;
- break;
- }
- }
- if (removedBrokerStruct == null) {
- throw new RuntimeException("Can't find broker to remove for EPR [" + rbEprUrl + "]");
- }
- if(!brokerList.remove(removedBrokerStruct)) {
- throw new RuntimeException("Broker was not removed");
- }
- }
- }
- }
-
- class NotificationTypeLock {
- public boolean isLocked;
- }
-
- class WriteableEPW implements Writeable {
- private EndpointReferenceWrapper epw;
-
- public WriteableEPW(EndpointReferenceWrapper epw, URL url) {
- EndpointAddress epa = new EndpointAddress();
- epa.setAddress(url);
- EndpointReference epr = new EndpointReference();
- epr.setEndpointAddress(epa);
- epw.setReference(epr);
- this.epw = epw;
- }
-
- public void write(OutputStream os) throws IOUtilsException {
- EncodingUtils.encodeToStream(encodingRegistry, epw, os);
- }
- }
-
- class InputStreamDecoder implements ReadableContinuation {
-
- public Object read(InputStream istream) throws IOUtilsException {
- try {
- return EncodingUtils.decodeFromStream(encodingRegistry, istream);
- } catch(EncodingException e) {
- throw new IOUtilsException(e);
- }
- }
- }
-
- class BrokerStruct {
- public URL consumerUrl;
- public URL producerUrl;
- public String brokerID;
-
- public BrokerStruct(URL consumerUrl, URL producerUrl, String brokerID) {
- this.consumerUrl = consumerUrl;
- this.producerUrl = producerUrl;
- this.brokerID = brokerID;
- }
- }
-
- class WriteableNewBroker implements Writeable {
- private NewBroker newBroker;
-
- public WriteableNewBroker(URL consumerUrl, URL producerUrl, String brokerID) {
- newBroker = new NewBroker();
- BrokerConsumerReference bcr = new BrokerConsumerReference();
- bcr.setReference(EncodingUtils.createEndpointReference(consumerUrl, brokerID));
- newBroker.setBrokerConsumerReference(bcr);
-
- BrokerProducerReference bpr = new BrokerProducerReference();
- bpr.setReference(EncodingUtils.createEndpointReference(producerUrl, brokerID));
- newBroker.setBrokerProducerReference(bpr);
- }
-
- public void write(OutputStream os) throws IOUtilsException {
- EncodingUtils.encodeToStream(encodingRegistry, newBroker, os);
- }
- }
-
- class WriteableNewBrokerAck implements Writeable {
- public void write(OutputStream os) throws IOUtilsException {
- EncodingUtils.encodeToStream(encodingRegistry, new NewBrokerAck(), os);
- }
- }
-
- class WriteableRemoveBroker implements Writeable {
- private RemoveBroker removeBroker;
-
- public WriteableRemoveBroker(EndpointReference brokerConsumerEpr, List<EndpointReference> neighborBrokerConsumerEprs) {
- removeBroker = new RemoveBroker();
- BrokerConsumerReference brokerConsumerReference = new BrokerConsumerReference();
- brokerConsumerReference.setReference(brokerConsumerEpr);
- removeBroker.setBrokerConsumerReference(brokerConsumerReference);
- if (neighborBrokerConsumerEprs != null) {
- NeighborBrokerConsumers neighborBrokerConsumers = new NeighborBrokerConsumers();
- neighborBrokerConsumers.setReferenceSequence(neighborBrokerConsumerEprs);
- neighborBrokerConsumers.setSequenceType(Constants.BrokerConsumers);
- removeBroker.setNeighborBrokerConsumers(neighborBrokerConsumers);
- }
- }
-
- public void write(OutputStream os) throws IOUtilsException {
- EncodingUtils.encodeToStream(encodingRegistry, removeBroker, os);
- }
- }
-
- class WriteableReplaceBrokerConnection implements Writeable {
- private ReplaceBrokerConnection replaceBrokerConnection;
-
- public WriteableReplaceBrokerConnection(EndpointReference removedBrokerEpr, List<Broker> brokerSequence) {
- replaceBrokerConnection = new ReplaceBrokerConnection();
- RemovedBroker removedBroker = new RemovedBroker();
- removedBroker.setReference(removedBrokerEpr);
- replaceBrokerConnection.setRemovedBroker(removedBroker);
- if (brokerSequence != null) {
- Neighbors neighbors = new Neighbors();
- neighbors.setBrokerSequence(brokerSequence);
- replaceBrokerConnection.setNeighbors(neighbors);
- }
- }
-
- public void write(OutputStream os) throws IOUtilsException {
- EncodingUtils.encodeToStream(encodingRegistry, replaceBrokerConnection, os);
- }
- }
-}