summaryrefslogtreecommitdiffstats
path: root/sca-java-1.x/branches/sca-java-1.6.2/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java
diff options
context:
space:
mode:
Diffstat (limited to 'sca-java-1.x/branches/sca-java-1.6.2/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java')
-rw-r--r--sca-java-1.x/branches/sca-java-1.6.2/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java317
1 files changed, 317 insertions, 0 deletions
diff --git a/sca-java-1.x/branches/sca-java-1.6.2/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java b/sca-java-1.x/branches/sca-java-1.6.2/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java
new file mode 100644
index 0000000000..4511fb454b
--- /dev/null
+++ b/sca-java-1.x/branches/sca-java-1.6.2/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerConsumerReference;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerID;
+import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverride;
+import org.apache.tuscany.sca.binding.notification.encoding.Constants;
+import org.apache.tuscany.sca.binding.notification.encoding.ConsumerReference;
+import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry;
+import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils;
+import org.apache.tuscany.sca.binding.notification.encoding.EndpointAddress;
+import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference;
+import org.apache.tuscany.sca.binding.notification.encoding.ReferenceProperties;
+import org.apache.tuscany.sca.binding.notification.encoding.Subscribe;
+import org.apache.tuscany.sca.binding.notification.util.IOUtils;
+import org.apache.tuscany.sca.binding.notification.util.NotificationServlet;
+import org.apache.tuscany.sca.binding.notification.util.URIUtil;
+import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException;
+import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable;
+import org.apache.tuscany.sca.binding.notification.util.NotificationServlet.NotificationServletStreamHandler;
+import org.apache.tuscany.sca.core.invocation.MessageImpl;
+import org.apache.tuscany.sca.host.http.ServletHost;
+import org.apache.tuscany.sca.interfacedef.Interface;
+import org.apache.tuscany.sca.interfacedef.InterfaceContract;
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.invocation.InvocationChain;
+import org.apache.tuscany.sca.invocation.Message;
+import org.apache.tuscany.sca.provider.ServiceBindingProvider;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+import org.apache.tuscany.sca.runtime.RuntimeWire;
+
+/**
+ * The runtime representation of the local service binding
+ *
+ * @version $Rev$ $Date$
+ */
+public class NotificationServiceBindingProvider
+ implements ServiceBindingProvider, NotificationServletStreamHandler {
+
+ private RuntimeWire wire;
+ private NotificationBinding notificationBinding;
+ private RuntimeComponentService service;
+ private ServletHost servletHost;
+ private NotificationTypeManager ntm;
+ private EncodingRegistry encodingRegistry;
+ private URI notificationType;
+ private URL myUrl;
+ private URL remoteNtmUrl;
+ private static final String consumerPathBase = "/consumer";
+ private boolean started;
+ private NotificationBrokerManager brokerManager;
+ private String brokerID;
+
+ public NotificationServiceBindingProvider(NotificationBinding notificationBinding,
+ RuntimeComponent component,
+ RuntimeComponentService service,
+ ServletHost servletHost,
+ NotificationTypeManager ntm,
+ EncodingRegistry encodingRegistry,
+ String httpUrl,
+ NotificationBrokerManager brokerManager) {
+ this.notificationBinding = notificationBinding;
+ this.service = service;
+ this.servletHost = servletHost;
+ this.ntm = ntm;
+ this.encodingRegistry = encodingRegistry;
+ this.notificationType = notificationBinding.getNotificationType();
+ String ntmAddress = notificationBinding.getNtmAddress();
+ String notificationTypePath = URIUtil.getPath(notificationType);
+ try {
+ this.myUrl = new URL(httpUrl + consumerPathBase + notificationTypePath);
+ remoteNtmUrl = null;
+ if (ntmAddress != null && notificationType != null) {
+ remoteNtmUrl = new URL(ntmAddress + notificationTypePath);
+ }
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ this.started = false;
+ this.brokerManager = brokerManager;
+ this.brokerID = null;
+
+ URI uri = URI.create(component.getURI() + "/" + notificationBinding.getName());
+ notificationBinding.setURI(uri.toString());
+ Interface interfaze = service.getInterfaceContract().getInterface();
+ interfaze.resetDataBinding(OMElement.class.getName());
+ for (Operation operation : interfaze.getOperations()) {
+ operation.setNonBlocking(false);
+ }
+ }
+
+ public NotificationBinding getBinding() {
+ return notificationBinding;
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ public URL getURL() {
+ return myUrl;
+ }
+
+ public InterfaceContract getBindingInterfaceContract() {
+ return service.getInterfaceContract();
+ }
+
+ public boolean supportsOneWayInvocation() {
+ return false;
+ }
+
+ public void start() {
+ if (started) {
+ return;
+ }
+
+ RuntimeComponentService componentService = (RuntimeComponentService) service;
+ wire = componentService.getRuntimeWire(notificationBinding);
+
+ for (InvocationChain ch : wire.getInvocationChains()) {
+ ch.setAllowsPassByReference(true);
+ }
+
+ brokerManager.serviceProviderStarted(notificationType, this, remoteNtmUrl);
+ started = true;
+ }
+
+ public void stop() {
+ }
+
+ public void deployConsumer() {
+ WriteableSubscribe ws = new WriteableSubscribe(myUrl, null);
+ List<URL> producerList = new ArrayList<URL>();
+ String sequenceType = ntm.newConsumer(notificationType, myUrl, remoteNtmUrl, producerList);
+ if (Constants.EndProducers.equals(sequenceType)) {
+ for (URL producerUrl : producerList) {
+ subscribeWithProducer(producerUrl, null, ws);
+ }
+ } else if (Constants.BrokerProducers.equals(sequenceType)) {
+ // Pick a broker producer, for now the first one
+ URL producerUrl = producerList.get(0);
+ subscribeWithProducer(producerUrl, null, ws);
+ }
+
+ servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this));
+ }
+
+ protected void subscribeWithProducer(URL producerUrl, String brokerID, WriteableSubscribe ws) {
+ if (ws == null) {
+ ws = new WriteableSubscribe(myUrl, brokerID);
+ }
+ try {
+ IOUtils.sendHttpRequest(producerUrl, Constants.SUBSCRIBE_OP, ws, null);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void deployBroker(String brokerID, EndpointReference brokerProducerEPR, List<EndpointReference> producerList) {
+ if (brokerProducerEPR != null) {
+ subscribeWithProducer(brokerProducerEPR.getEndpointAddress().getAddress(), brokerID, null);
+ }
+ this.brokerID = brokerID;
+ if (producerList != null && !producerList.isEmpty()) {
+ WriteableConnectionOverride wco = new WriteableConnectionOverride(myUrl, brokerID);
+ for (EndpointReference producerEPR : producerList) {
+ try {
+ IOUtils.sendHttpRequest(producerEPR.getEndpointAddress().getAddress(), Constants.CONNECTION_OVERRIDE_OP, wco, null);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this));
+ }
+
+ public void replaceBrokerConnection(EndpointReference chosenBrokerProducerEpr) {
+ if (brokerID == null) {
+ throw new RuntimeException("Missing broker id");
+ }
+ URL producerUrl = chosenBrokerProducerEpr.getEndpointAddress().getAddress();
+ subscribeWithProducer(producerUrl, brokerID, null);
+ }
+
+ public void handle(Map<String, String> headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream) {
+ String opHeader = headers.get(IOUtils.Notification_Operation);
+ String incomingBrokerID = headers.get(Constants.Broker_ID);
+ if (opHeader == null) {
+ throw new RuntimeException("Missing operation header");
+ }
+ if (wire == null) {
+ throw new RuntimeException("Missing wire");
+ }
+ InvocationChain chain = null;
+ for (InvocationChain ch : wire.getInvocationChains()) {
+ // We may want to use more than just the op name
+ if(ch.getTargetOperation().getName().equals(opHeader)) {
+ chain = ch;
+ break;
+ }
+ }
+ if (chain == null) {
+ throw new RuntimeException("Can't find invocation chain match for [" + opHeader + "]");
+ }
+ byte[] payload = null;
+ try {
+ payload = IOUtils.readFully(istream, contentLength);
+ } catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+ Object[] args = getArgsFromByteArray(payload, incomingBrokerID);
+
+ invoke(chain, args);
+
+ // Doing nothing to ostream is equivalent to returning null
+ }
+
+ private Object[] getArgsFromByteArray(byte[] payload, String incomingBrokerID) {
+ try {
+ StAXOMBuilder builder = new StAXOMBuilder(new ByteArrayInputStream(payload));
+ OMElement element = builder.getDocumentElement();
+ return new Object[] { element };
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void invoke(InvocationChain chain, Object[] args) {
+ Message msg = new MessageImpl();
+ msg.setBody(args);
+ chain.getHeadInvoker().invoke(msg);
+ }
+
+ class WriteableSubscribe implements Writeable {
+
+ private Subscribe sub;
+
+ public WriteableSubscribe(URL url, String brokerID) {
+ EndpointAddress epa = new EndpointAddress();
+ epa.setAddress(url);
+ EndpointReference epr = new EndpointReference();
+ epr.setEndpointAddress(epa);
+ if (brokerID != null) {
+ BrokerID cbi = new BrokerID();
+ cbi.setID(brokerID);
+ ReferenceProperties crp = new ReferenceProperties();
+ crp.addProperty(cbi);
+ epr.setReferenceProperties(crp);
+ }
+ ConsumerReference cr = new ConsumerReference();
+ cr.setReference(epr);
+ sub = new Subscribe();
+ sub.setConsumerReference(cr);
+ }
+
+ public void write(OutputStream os) throws IOUtilsException {
+ EncodingUtils.encodeToStream(encodingRegistry, sub, os);
+ }
+ }
+
+ class WriteableConnectionOverride implements Writeable {
+
+ private ConnectionOverride connectionOverride;
+
+ public WriteableConnectionOverride(URL brokerConsumerUrl, String brokerID) {
+ EndpointAddress epa = new EndpointAddress();
+ epa.setAddress(brokerConsumerUrl);
+ EndpointReference brokerConsumerEPR = new EndpointReference();
+ brokerConsumerEPR.setEndpointAddress(epa);
+ BrokerID cbi = new BrokerID();
+ cbi.setID(brokerID);
+ ReferenceProperties crp = new ReferenceProperties();
+ crp.addProperty(cbi);
+ brokerConsumerEPR.setReferenceProperties(crp);
+ BrokerConsumerReference brokerConsumerReference = new BrokerConsumerReference();
+ brokerConsumerReference.setReference(brokerConsumerEPR);
+ connectionOverride = new ConnectionOverride();
+ connectionOverride.setBrokerConsumerReference(brokerConsumerReference);
+ }
+
+ public void write(OutputStream os) throws IOUtilsException {
+ EncodingUtils.encodeToStream(encodingRegistry, connectionOverride, os);
+ }
+ }
+}