summaryrefslogtreecommitdiffstats
path: root/sandbox/event/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java
diff options
context:
space:
mode:
Diffstat (limited to 'sandbox/event/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java')
-rw-r--r--sandbox/event/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java317
1 files changed, 0 insertions, 317 deletions
diff --git a/sandbox/event/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java b/sandbox/event/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java
deleted file mode 100644
index 4511fb454b..0000000000
--- a/sandbox/event/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tuscany.sca.binding.notification;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import javax.servlet.ServletInputStream;
-import javax.servlet.ServletOutputStream;
-
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.impl.builder.StAXOMBuilder;
-import org.apache.tuscany.sca.binding.notification.encoding.BrokerConsumerReference;
-import org.apache.tuscany.sca.binding.notification.encoding.BrokerID;
-import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverride;
-import org.apache.tuscany.sca.binding.notification.encoding.Constants;
-import org.apache.tuscany.sca.binding.notification.encoding.ConsumerReference;
-import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry;
-import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils;
-import org.apache.tuscany.sca.binding.notification.encoding.EndpointAddress;
-import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference;
-import org.apache.tuscany.sca.binding.notification.encoding.ReferenceProperties;
-import org.apache.tuscany.sca.binding.notification.encoding.Subscribe;
-import org.apache.tuscany.sca.binding.notification.util.IOUtils;
-import org.apache.tuscany.sca.binding.notification.util.NotificationServlet;
-import org.apache.tuscany.sca.binding.notification.util.URIUtil;
-import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException;
-import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable;
-import org.apache.tuscany.sca.binding.notification.util.NotificationServlet.NotificationServletStreamHandler;
-import org.apache.tuscany.sca.core.invocation.MessageImpl;
-import org.apache.tuscany.sca.host.http.ServletHost;
-import org.apache.tuscany.sca.interfacedef.Interface;
-import org.apache.tuscany.sca.interfacedef.InterfaceContract;
-import org.apache.tuscany.sca.interfacedef.Operation;
-import org.apache.tuscany.sca.invocation.InvocationChain;
-import org.apache.tuscany.sca.invocation.Message;
-import org.apache.tuscany.sca.provider.ServiceBindingProvider;
-import org.apache.tuscany.sca.runtime.RuntimeComponent;
-import org.apache.tuscany.sca.runtime.RuntimeComponentService;
-import org.apache.tuscany.sca.runtime.RuntimeWire;
-
-/**
- * The runtime representation of the local service binding
- *
- * @version $Rev$ $Date$
- */
-public class NotificationServiceBindingProvider
- implements ServiceBindingProvider, NotificationServletStreamHandler {
-
- private RuntimeWire wire;
- private NotificationBinding notificationBinding;
- private RuntimeComponentService service;
- private ServletHost servletHost;
- private NotificationTypeManager ntm;
- private EncodingRegistry encodingRegistry;
- private URI notificationType;
- private URL myUrl;
- private URL remoteNtmUrl;
- private static final String consumerPathBase = "/consumer";
- private boolean started;
- private NotificationBrokerManager brokerManager;
- private String brokerID;
-
- public NotificationServiceBindingProvider(NotificationBinding notificationBinding,
- RuntimeComponent component,
- RuntimeComponentService service,
- ServletHost servletHost,
- NotificationTypeManager ntm,
- EncodingRegistry encodingRegistry,
- String httpUrl,
- NotificationBrokerManager brokerManager) {
- this.notificationBinding = notificationBinding;
- this.service = service;
- this.servletHost = servletHost;
- this.ntm = ntm;
- this.encodingRegistry = encodingRegistry;
- this.notificationType = notificationBinding.getNotificationType();
- String ntmAddress = notificationBinding.getNtmAddress();
- String notificationTypePath = URIUtil.getPath(notificationType);
- try {
- this.myUrl = new URL(httpUrl + consumerPathBase + notificationTypePath);
- remoteNtmUrl = null;
- if (ntmAddress != null && notificationType != null) {
- remoteNtmUrl = new URL(ntmAddress + notificationTypePath);
- }
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- this.started = false;
- this.brokerManager = brokerManager;
- this.brokerID = null;
-
- URI uri = URI.create(component.getURI() + "/" + notificationBinding.getName());
- notificationBinding.setURI(uri.toString());
- Interface interfaze = service.getInterfaceContract().getInterface();
- interfaze.resetDataBinding(OMElement.class.getName());
- for (Operation operation : interfaze.getOperations()) {
- operation.setNonBlocking(false);
- }
- }
-
- public NotificationBinding getBinding() {
- return notificationBinding;
- }
-
- public boolean isStarted() {
- return started;
- }
-
- public URL getURL() {
- return myUrl;
- }
-
- public InterfaceContract getBindingInterfaceContract() {
- return service.getInterfaceContract();
- }
-
- public boolean supportsOneWayInvocation() {
- return false;
- }
-
- public void start() {
- if (started) {
- return;
- }
-
- RuntimeComponentService componentService = (RuntimeComponentService) service;
- wire = componentService.getRuntimeWire(notificationBinding);
-
- for (InvocationChain ch : wire.getInvocationChains()) {
- ch.setAllowsPassByReference(true);
- }
-
- brokerManager.serviceProviderStarted(notificationType, this, remoteNtmUrl);
- started = true;
- }
-
- public void stop() {
- }
-
- public void deployConsumer() {
- WriteableSubscribe ws = new WriteableSubscribe(myUrl, null);
- List<URL> producerList = new ArrayList<URL>();
- String sequenceType = ntm.newConsumer(notificationType, myUrl, remoteNtmUrl, producerList);
- if (Constants.EndProducers.equals(sequenceType)) {
- for (URL producerUrl : producerList) {
- subscribeWithProducer(producerUrl, null, ws);
- }
- } else if (Constants.BrokerProducers.equals(sequenceType)) {
- // Pick a broker producer, for now the first one
- URL producerUrl = producerList.get(0);
- subscribeWithProducer(producerUrl, null, ws);
- }
-
- servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this));
- }
-
- protected void subscribeWithProducer(URL producerUrl, String brokerID, WriteableSubscribe ws) {
- if (ws == null) {
- ws = new WriteableSubscribe(myUrl, brokerID);
- }
- try {
- IOUtils.sendHttpRequest(producerUrl, Constants.SUBSCRIBE_OP, ws, null);
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void deployBroker(String brokerID, EndpointReference brokerProducerEPR, List<EndpointReference> producerList) {
- if (brokerProducerEPR != null) {
- subscribeWithProducer(brokerProducerEPR.getEndpointAddress().getAddress(), brokerID, null);
- }
- this.brokerID = brokerID;
- if (producerList != null && !producerList.isEmpty()) {
- WriteableConnectionOverride wco = new WriteableConnectionOverride(myUrl, brokerID);
- for (EndpointReference producerEPR : producerList) {
- try {
- IOUtils.sendHttpRequest(producerEPR.getEndpointAddress().getAddress(), Constants.CONNECTION_OVERRIDE_OP, wco, null);
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
- servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this));
- }
-
- public void replaceBrokerConnection(EndpointReference chosenBrokerProducerEpr) {
- if (brokerID == null) {
- throw new RuntimeException("Missing broker id");
- }
- URL producerUrl = chosenBrokerProducerEpr.getEndpointAddress().getAddress();
- subscribeWithProducer(producerUrl, brokerID, null);
- }
-
- public void handle(Map<String, String> headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream) {
- String opHeader = headers.get(IOUtils.Notification_Operation);
- String incomingBrokerID = headers.get(Constants.Broker_ID);
- if (opHeader == null) {
- throw new RuntimeException("Missing operation header");
- }
- if (wire == null) {
- throw new RuntimeException("Missing wire");
- }
- InvocationChain chain = null;
- for (InvocationChain ch : wire.getInvocationChains()) {
- // We may want to use more than just the op name
- if(ch.getTargetOperation().getName().equals(opHeader)) {
- chain = ch;
- break;
- }
- }
- if (chain == null) {
- throw new RuntimeException("Can't find invocation chain match for [" + opHeader + "]");
- }
- byte[] payload = null;
- try {
- payload = IOUtils.readFully(istream, contentLength);
- } catch(IOException e) {
- throw new RuntimeException(e);
- }
- Object[] args = getArgsFromByteArray(payload, incomingBrokerID);
-
- invoke(chain, args);
-
- // Doing nothing to ostream is equivalent to returning null
- }
-
- private Object[] getArgsFromByteArray(byte[] payload, String incomingBrokerID) {
- try {
- StAXOMBuilder builder = new StAXOMBuilder(new ByteArrayInputStream(payload));
- OMElement element = builder.getDocumentElement();
- return new Object[] { element };
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- protected void invoke(InvocationChain chain, Object[] args) {
- Message msg = new MessageImpl();
- msg.setBody(args);
- chain.getHeadInvoker().invoke(msg);
- }
-
- class WriteableSubscribe implements Writeable {
-
- private Subscribe sub;
-
- public WriteableSubscribe(URL url, String brokerID) {
- EndpointAddress epa = new EndpointAddress();
- epa.setAddress(url);
- EndpointReference epr = new EndpointReference();
- epr.setEndpointAddress(epa);
- if (brokerID != null) {
- BrokerID cbi = new BrokerID();
- cbi.setID(brokerID);
- ReferenceProperties crp = new ReferenceProperties();
- crp.addProperty(cbi);
- epr.setReferenceProperties(crp);
- }
- ConsumerReference cr = new ConsumerReference();
- cr.setReference(epr);
- sub = new Subscribe();
- sub.setConsumerReference(cr);
- }
-
- public void write(OutputStream os) throws IOUtilsException {
- EncodingUtils.encodeToStream(encodingRegistry, sub, os);
- }
- }
-
- class WriteableConnectionOverride implements Writeable {
-
- private ConnectionOverride connectionOverride;
-
- public WriteableConnectionOverride(URL brokerConsumerUrl, String brokerID) {
- EndpointAddress epa = new EndpointAddress();
- epa.setAddress(brokerConsumerUrl);
- EndpointReference brokerConsumerEPR = new EndpointReference();
- brokerConsumerEPR.setEndpointAddress(epa);
- BrokerID cbi = new BrokerID();
- cbi.setID(brokerID);
- ReferenceProperties crp = new ReferenceProperties();
- crp.addProperty(cbi);
- brokerConsumerEPR.setReferenceProperties(crp);
- BrokerConsumerReference brokerConsumerReference = new BrokerConsumerReference();
- brokerConsumerReference.setReference(brokerConsumerEPR);
- connectionOverride = new ConnectionOverride();
- connectionOverride.setBrokerConsumerReference(brokerConsumerReference);
- }
-
- public void write(OutputStream os) throws IOUtilsException {
- EncodingUtils.encodeToStream(encodingRegistry, connectionOverride, os);
- }
- }
-}