From 89d72264d0af4c65c9bf0b7a6f823f13d69162aa Mon Sep 17 00:00:00 2001 From: lresende Date: Wed, 11 Nov 2009 23:07:53 +0000 Subject: Moving 1.x branches git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@835127 13f79535-47bb-0310-9956-ffa450edef68 --- .../NotificationServiceBindingProvider.java | 317 --------------------- 1 file changed, 317 deletions(-) delete mode 100644 branches/sca-java-1.3/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java (limited to 'branches/sca-java-1.3/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java') diff --git a/branches/sca-java-1.3/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java b/branches/sca-java-1.3/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java deleted file mode 100644 index 4511fb454b..0000000000 --- a/branches/sca-java-1.3/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.tuscany.sca.binding.notification; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.net.URL; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import javax.servlet.ServletInputStream; -import javax.servlet.ServletOutputStream; - -import org.apache.axiom.om.OMElement; -import org.apache.axiom.om.impl.builder.StAXOMBuilder; -import org.apache.tuscany.sca.binding.notification.encoding.BrokerConsumerReference; -import org.apache.tuscany.sca.binding.notification.encoding.BrokerID; -import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverride; -import org.apache.tuscany.sca.binding.notification.encoding.Constants; -import org.apache.tuscany.sca.binding.notification.encoding.ConsumerReference; -import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry; -import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils; -import org.apache.tuscany.sca.binding.notification.encoding.EndpointAddress; -import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference; -import org.apache.tuscany.sca.binding.notification.encoding.ReferenceProperties; -import org.apache.tuscany.sca.binding.notification.encoding.Subscribe; -import org.apache.tuscany.sca.binding.notification.util.IOUtils; -import org.apache.tuscany.sca.binding.notification.util.NotificationServlet; -import org.apache.tuscany.sca.binding.notification.util.URIUtil; -import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException; -import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable; -import org.apache.tuscany.sca.binding.notification.util.NotificationServlet.NotificationServletStreamHandler; -import org.apache.tuscany.sca.core.invocation.MessageImpl; -import org.apache.tuscany.sca.host.http.ServletHost; -import org.apache.tuscany.sca.interfacedef.Interface; -import org.apache.tuscany.sca.interfacedef.InterfaceContract; -import org.apache.tuscany.sca.interfacedef.Operation; -import org.apache.tuscany.sca.invocation.InvocationChain; -import org.apache.tuscany.sca.invocation.Message; -import org.apache.tuscany.sca.provider.ServiceBindingProvider; -import org.apache.tuscany.sca.runtime.RuntimeComponent; -import org.apache.tuscany.sca.runtime.RuntimeComponentService; -import org.apache.tuscany.sca.runtime.RuntimeWire; - -/** - * The runtime representation of the local service binding - * - * @version $Rev$ $Date$ - */ -public class NotificationServiceBindingProvider - implements ServiceBindingProvider, NotificationServletStreamHandler { - - private RuntimeWire wire; - private NotificationBinding notificationBinding; - private RuntimeComponentService service; - private ServletHost servletHost; - private NotificationTypeManager ntm; - private EncodingRegistry encodingRegistry; - private URI notificationType; - private URL myUrl; - private URL remoteNtmUrl; - private static final String consumerPathBase = "/consumer"; - private boolean started; - private NotificationBrokerManager brokerManager; - private String brokerID; - - public NotificationServiceBindingProvider(NotificationBinding notificationBinding, - RuntimeComponent component, - RuntimeComponentService service, - ServletHost servletHost, - NotificationTypeManager ntm, - EncodingRegistry encodingRegistry, - String httpUrl, - NotificationBrokerManager brokerManager) { - this.notificationBinding = notificationBinding; - this.service = service; - this.servletHost = servletHost; - this.ntm = ntm; - this.encodingRegistry = encodingRegistry; - this.notificationType = notificationBinding.getNotificationType(); - String ntmAddress = notificationBinding.getNtmAddress(); - String notificationTypePath = URIUtil.getPath(notificationType); - try { - this.myUrl = new URL(httpUrl + consumerPathBase + notificationTypePath); - remoteNtmUrl = null; - if (ntmAddress != null && notificationType != null) { - remoteNtmUrl = new URL(ntmAddress + notificationTypePath); - } - } catch(Exception e) { - throw new RuntimeException(e); - } - this.started = false; - this.brokerManager = brokerManager; - this.brokerID = null; - - URI uri = URI.create(component.getURI() + "/" + notificationBinding.getName()); - notificationBinding.setURI(uri.toString()); - Interface interfaze = service.getInterfaceContract().getInterface(); - interfaze.resetDataBinding(OMElement.class.getName()); - for (Operation operation : interfaze.getOperations()) { - operation.setNonBlocking(false); - } - } - - public NotificationBinding getBinding() { - return notificationBinding; - } - - public boolean isStarted() { - return started; - } - - public URL getURL() { - return myUrl; - } - - public InterfaceContract getBindingInterfaceContract() { - return service.getInterfaceContract(); - } - - public boolean supportsOneWayInvocation() { - return false; - } - - public void start() { - if (started) { - return; - } - - RuntimeComponentService componentService = (RuntimeComponentService) service; - wire = componentService.getRuntimeWire(notificationBinding); - - for (InvocationChain ch : wire.getInvocationChains()) { - ch.setAllowsPassByReference(true); - } - - brokerManager.serviceProviderStarted(notificationType, this, remoteNtmUrl); - started = true; - } - - public void stop() { - } - - public void deployConsumer() { - WriteableSubscribe ws = new WriteableSubscribe(myUrl, null); - List producerList = new ArrayList(); - String sequenceType = ntm.newConsumer(notificationType, myUrl, remoteNtmUrl, producerList); - if (Constants.EndProducers.equals(sequenceType)) { - for (URL producerUrl : producerList) { - subscribeWithProducer(producerUrl, null, ws); - } - } else if (Constants.BrokerProducers.equals(sequenceType)) { - // Pick a broker producer, for now the first one - URL producerUrl = producerList.get(0); - subscribeWithProducer(producerUrl, null, ws); - } - - servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this)); - } - - protected void subscribeWithProducer(URL producerUrl, String brokerID, WriteableSubscribe ws) { - if (ws == null) { - ws = new WriteableSubscribe(myUrl, brokerID); - } - try { - IOUtils.sendHttpRequest(producerUrl, Constants.SUBSCRIBE_OP, ws, null); - } catch(Exception e) { - throw new RuntimeException(e); - } - } - - public void deployBroker(String brokerID, EndpointReference brokerProducerEPR, List producerList) { - if (brokerProducerEPR != null) { - subscribeWithProducer(brokerProducerEPR.getEndpointAddress().getAddress(), brokerID, null); - } - this.brokerID = brokerID; - if (producerList != null && !producerList.isEmpty()) { - WriteableConnectionOverride wco = new WriteableConnectionOverride(myUrl, brokerID); - for (EndpointReference producerEPR : producerList) { - try { - IOUtils.sendHttpRequest(producerEPR.getEndpointAddress().getAddress(), Constants.CONNECTION_OVERRIDE_OP, wco, null); - } catch(Exception e) { - throw new RuntimeException(e); - } - } - } - servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this)); - } - - public void replaceBrokerConnection(EndpointReference chosenBrokerProducerEpr) { - if (brokerID == null) { - throw new RuntimeException("Missing broker id"); - } - URL producerUrl = chosenBrokerProducerEpr.getEndpointAddress().getAddress(); - subscribeWithProducer(producerUrl, brokerID, null); - } - - public void handle(Map headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream) { - String opHeader = headers.get(IOUtils.Notification_Operation); - String incomingBrokerID = headers.get(Constants.Broker_ID); - if (opHeader == null) { - throw new RuntimeException("Missing operation header"); - } - if (wire == null) { - throw new RuntimeException("Missing wire"); - } - InvocationChain chain = null; - for (InvocationChain ch : wire.getInvocationChains()) { - // We may want to use more than just the op name - if(ch.getTargetOperation().getName().equals(opHeader)) { - chain = ch; - break; - } - } - if (chain == null) { - throw new RuntimeException("Can't find invocation chain match for [" + opHeader + "]"); - } - byte[] payload = null; - try { - payload = IOUtils.readFully(istream, contentLength); - } catch(IOException e) { - throw new RuntimeException(e); - } - Object[] args = getArgsFromByteArray(payload, incomingBrokerID); - - invoke(chain, args); - - // Doing nothing to ostream is equivalent to returning null - } - - private Object[] getArgsFromByteArray(byte[] payload, String incomingBrokerID) { - try { - StAXOMBuilder builder = new StAXOMBuilder(new ByteArrayInputStream(payload)); - OMElement element = builder.getDocumentElement(); - return new Object[] { element }; - } catch(Exception e) { - throw new RuntimeException(e); - } - } - - protected void invoke(InvocationChain chain, Object[] args) { - Message msg = new MessageImpl(); - msg.setBody(args); - chain.getHeadInvoker().invoke(msg); - } - - class WriteableSubscribe implements Writeable { - - private Subscribe sub; - - public WriteableSubscribe(URL url, String brokerID) { - EndpointAddress epa = new EndpointAddress(); - epa.setAddress(url); - EndpointReference epr = new EndpointReference(); - epr.setEndpointAddress(epa); - if (brokerID != null) { - BrokerID cbi = new BrokerID(); - cbi.setID(brokerID); - ReferenceProperties crp = new ReferenceProperties(); - crp.addProperty(cbi); - epr.setReferenceProperties(crp); - } - ConsumerReference cr = new ConsumerReference(); - cr.setReference(epr); - sub = new Subscribe(); - sub.setConsumerReference(cr); - } - - public void write(OutputStream os) throws IOUtilsException { - EncodingUtils.encodeToStream(encodingRegistry, sub, os); - } - } - - class WriteableConnectionOverride implements Writeable { - - private ConnectionOverride connectionOverride; - - public WriteableConnectionOverride(URL brokerConsumerUrl, String brokerID) { - EndpointAddress epa = new EndpointAddress(); - epa.setAddress(brokerConsumerUrl); - EndpointReference brokerConsumerEPR = new EndpointReference(); - brokerConsumerEPR.setEndpointAddress(epa); - BrokerID cbi = new BrokerID(); - cbi.setID(brokerID); - ReferenceProperties crp = new ReferenceProperties(); - crp.addProperty(cbi); - brokerConsumerEPR.setReferenceProperties(crp); - BrokerConsumerReference brokerConsumerReference = new BrokerConsumerReference(); - brokerConsumerReference.setReference(brokerConsumerEPR); - connectionOverride = new ConnectionOverride(); - connectionOverride.setBrokerConsumerReference(brokerConsumerReference); - } - - public void write(OutputStream os) throws IOUtilsException { - EncodingUtils.encodeToStream(encodingRegistry, connectionOverride, os); - } - } -} -- cgit v1.2.3