/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.tuscany.sca.binding.notification; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI; import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.Map; import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; import org.apache.axiom.om.OMElement; import org.apache.axiom.om.impl.builder.StAXOMBuilder; import org.apache.tuscany.sca.binding.notification.encoding.BrokerConsumerReference; import org.apache.tuscany.sca.binding.notification.encoding.BrokerID; import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverride; import org.apache.tuscany.sca.binding.notification.encoding.Constants; import org.apache.tuscany.sca.binding.notification.encoding.ConsumerReference; import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry; import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils; import org.apache.tuscany.sca.binding.notification.encoding.EndpointAddress; import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference; import org.apache.tuscany.sca.binding.notification.encoding.ReferenceProperties; import org.apache.tuscany.sca.binding.notification.encoding.Subscribe; import org.apache.tuscany.sca.binding.notification.util.IOUtils; import org.apache.tuscany.sca.binding.notification.util.NotificationServlet; import org.apache.tuscany.sca.binding.notification.util.URIUtil; import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException; import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable; import org.apache.tuscany.sca.binding.notification.util.NotificationServlet.NotificationServletStreamHandler; import org.apache.tuscany.sca.core.invocation.MessageImpl; import org.apache.tuscany.sca.host.http.ServletHost; import org.apache.tuscany.sca.interfacedef.Interface; import org.apache.tuscany.sca.interfacedef.InterfaceContract; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.InvocationChain; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.provider.ServiceBindingProvider; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentService; import org.apache.tuscany.sca.runtime.RuntimeWire; /** * The runtime representation of the local service binding * * @version $Rev$ $Date$ */ public class NotificationServiceBindingProvider implements ServiceBindingProvider, NotificationServletStreamHandler { private RuntimeWire wire; private NotificationBinding notificationBinding; private RuntimeComponentService service; private ServletHost servletHost; private NotificationTypeManager ntm; private EncodingRegistry encodingRegistry; private URI notificationType; private URL myUrl; private URL remoteNtmUrl; private static final String consumerPathBase = "/consumer"; private boolean started; private NotificationBrokerManager brokerManager; private String brokerID; public NotificationServiceBindingProvider(NotificationBinding notificationBinding, RuntimeComponent component, RuntimeComponentService service, ServletHost servletHost, NotificationTypeManager ntm, EncodingRegistry encodingRegistry, String httpUrl, NotificationBrokerManager brokerManager) { this.notificationBinding = notificationBinding; this.service = service; this.servletHost = servletHost; this.ntm = ntm; this.encodingRegistry = encodingRegistry; this.notificationType = notificationBinding.getNotificationType(); String ntmAddress = notificationBinding.getNtmAddress(); String notificationTypePath = URIUtil.getPath(notificationType); try { this.myUrl = new URL(httpUrl + consumerPathBase + notificationTypePath); remoteNtmUrl = null; if (ntmAddress != null && notificationType != null) { remoteNtmUrl = new URL(ntmAddress + notificationTypePath); } } catch(Exception e) { throw new RuntimeException(e); } this.started = false; this.brokerManager = brokerManager; this.brokerID = null; URI uri = URI.create(component.getURI() + "/" + notificationBinding.getName()); notificationBinding.setURI(uri.toString()); Interface interfaze = service.getInterfaceContract().getInterface(); interfaze.resetDataBinding(OMElement.class.getName()); for (Operation operation : interfaze.getOperations()) { operation.setNonBlocking(false); } } public NotificationBinding getBinding() { return notificationBinding; } public boolean isStarted() { return started; } public URL getURL() { return myUrl; } public InterfaceContract getBindingInterfaceContract() { return service.getInterfaceContract(); } public boolean supportsOneWayInvocation() { return false; } public void start() { if (started) { return; } RuntimeComponentService componentService = (RuntimeComponentService) service; wire = componentService.getRuntimeWire(notificationBinding); for (InvocationChain ch : wire.getInvocationChains()) { ch.setAllowsPassByReference(true); } brokerManager.serviceProviderStarted(notificationType, this, remoteNtmUrl); started = true; } public void stop() { } public void deployConsumer() { WriteableSubscribe ws = new WriteableSubscribe(myUrl, null); List producerList = new ArrayList(); String sequenceType = ntm.newConsumer(notificationType, myUrl, remoteNtmUrl, producerList); if (Constants.EndProducers.equals(sequenceType)) { for (URL producerUrl : producerList) { subscribeWithProducer(producerUrl, null, ws); } } else if (Constants.BrokerProducers.equals(sequenceType)) { // Pick a broker producer, for now the first one URL producerUrl = producerList.get(0); subscribeWithProducer(producerUrl, null, ws); } servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this)); } protected void subscribeWithProducer(URL producerUrl, String brokerID, WriteableSubscribe ws) { if (ws == null) { ws = new WriteableSubscribe(myUrl, brokerID); } try { IOUtils.sendHttpRequest(producerUrl, Constants.SUBSCRIBE_OP, ws, null); } catch(Exception e) { throw new RuntimeException(e); } } public void deployBroker(String brokerID, EndpointReference brokerProducerEPR, List producerList) { if (brokerProducerEPR != null) { subscribeWithProducer(brokerProducerEPR.getEndpointAddress().getAddress(), brokerID, null); } this.brokerID = brokerID; if (producerList != null && !producerList.isEmpty()) { WriteableConnectionOverride wco = new WriteableConnectionOverride(myUrl, brokerID); for (EndpointReference producerEPR : producerList) { try { IOUtils.sendHttpRequest(producerEPR.getEndpointAddress().getAddress(), Constants.CONNECTION_OVERRIDE_OP, wco, null); } catch(Exception e) { throw new RuntimeException(e); } } } servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this)); } public void replaceBrokerConnection(EndpointReference chosenBrokerProducerEpr) { if (brokerID == null) { throw new RuntimeException("Missing broker id"); } URL producerUrl = chosenBrokerProducerEpr.getEndpointAddress().getAddress(); subscribeWithProducer(producerUrl, brokerID, null); } public void handle(Map headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream) { String opHeader = headers.get(IOUtils.Notification_Operation); String incomingBrokerID = headers.get(Constants.Broker_ID); if (opHeader == null) { throw new RuntimeException("Missing operation header"); } if (wire == null) { throw new RuntimeException("Missing wire"); } InvocationChain chain = null; for (InvocationChain ch : wire.getInvocationChains()) { // We may want to use more than just the op name if(ch.getTargetOperation().getName().equals(opHeader)) { chain = ch; break; } } if (chain == null) { throw new RuntimeException("Can't find invocation chain match for [" + opHeader + "]"); } byte[] payload = null; try { payload = IOUtils.readFully(istream, contentLength); } catch(IOException e) { throw new RuntimeException(e); } Object[] args = getArgsFromByteArray(payload, incomingBrokerID); invoke(chain, args); // Doing nothing to ostream is equivalent to returning null } private Object[] getArgsFromByteArray(byte[] payload, String incomingBrokerID) { try { StAXOMBuilder builder = new StAXOMBuilder(new ByteArrayInputStream(payload)); OMElement element = builder.getDocumentElement(); return new Object[] { element }; } catch(Exception e) { throw new RuntimeException(e); } } protected void invoke(InvocationChain chain, Object[] args) { Message msg = new MessageImpl(); msg.setBody(args); chain.getHeadInvoker().invoke(msg); } class WriteableSubscribe implements Writeable { private Subscribe sub; public WriteableSubscribe(URL url, String brokerID) { EndpointAddress epa = new EndpointAddress(); epa.setAddress(url); EndpointReference epr = new EndpointReference(); epr.setEndpointAddress(epa); if (brokerID != null) { BrokerID cbi = new BrokerID(); cbi.setID(brokerID); ReferenceProperties crp = new ReferenceProperties(); crp.addProperty(cbi); epr.setReferenceProperties(crp); } ConsumerReference cr = new ConsumerReference(); cr.setReference(epr); sub = new Subscribe(); sub.setConsumerReference(cr); } public void write(OutputStream os) throws IOUtilsException { EncodingUtils.encodeToStream(encodingRegistry, sub, os); } } class WriteableConnectionOverride implements Writeable { private ConnectionOverride connectionOverride; public WriteableConnectionOverride(URL brokerConsumerUrl, String brokerID) { EndpointAddress epa = new EndpointAddress(); epa.setAddress(brokerConsumerUrl); EndpointReference brokerConsumerEPR = new EndpointReference(); brokerConsumerEPR.setEndpointAddress(epa); BrokerID cbi = new BrokerID(); cbi.setID(brokerID); ReferenceProperties crp = new ReferenceProperties(); crp.addProperty(cbi); brokerConsumerEPR.setReferenceProperties(crp); BrokerConsumerReference brokerConsumerReference = new BrokerConsumerReference(); brokerConsumerReference.setReference(brokerConsumerEPR); connectionOverride = new ConnectionOverride(); connectionOverride.setBrokerConsumerReference(brokerConsumerReference); } public void write(OutputStream os) throws IOUtilsException { EncodingUtils.encodeToStream(encodingRegistry, connectionOverride, os); } } }