diff options
author | slaws <slaws@13f79535-47bb-0310-9956-ffa450edef68> | 2008-10-21 13:22:46 +0000 |
---|---|---|
committer | slaws <slaws@13f79535-47bb-0310-9956-ffa450edef68> | 2008-10-21 13:22:46 +0000 |
commit | 27135c0af6c70a3cc82e64b5c34c6cbdf9b738c3 (patch) | |
tree | b403505b82aca5c2ef3ae445e5c6b6482ea78223 /java/sca/modules/binding-jms-runtime | |
parent | 8cd9a3eeda24fc639227f2af6d3961974e51dfe9 (diff) |
Experimental request response binding support. Just implemented for jms service binding and only activated when wireFormat appears in the SCDL. Still early stage so many more changes to come.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@706620 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/sca/modules/binding-jms-runtime')
18 files changed, 1520 insertions, 2 deletions
diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefault.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefault.java new file mode 100644 index 0000000000..74a5368dc6 --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefault.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault; + +import javax.xml.namespace.QName; + +import org.apache.tuscany.sca.assembly.xml.Constants; +import org.apache.tuscany.sca.assembly.WireFormat; + +/** + * Implementation for policies that could be injected as parameter + * into the axis2config. + * + * @version $Rev$ $Date$ + */ +public class OperationSelectorJMSDefault implements WireFormat { + public static final QName OPERATION_SELECTOR_JMS_DEFAULT_QNAME = new QName(Constants.SCA10_TUSCANY_NS, "operationSelector.JMSDefault"); + + public QName getSchemaName() { + return OPERATION_SELECTOR_JMS_DEFAULT_QNAME; + } + + public boolean isUnresolved() { + return false; + } + + public void setUnresolved(boolean unresolved) { + } +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultProcessor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultProcessor.java new file mode 100644 index 0000000000..be483c66dd --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultProcessor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +import org.apache.tuscany.sca.assembly.xml.Constants; +import org.apache.tuscany.sca.binding.jms.policy.header.JMSHeaderPolicy; +import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint; +import org.apache.tuscany.sca.contribution.processor.BaseStAXArtifactProcessor; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor; +import org.apache.tuscany.sca.contribution.resolver.ModelResolver; +import org.apache.tuscany.sca.contribution.service.ContributionReadException; +import org.apache.tuscany.sca.contribution.service.ContributionResolveException; +import org.apache.tuscany.sca.contribution.service.ContributionWriteException; +import org.apache.tuscany.sca.monitor.Monitor; + +/** + * + * @version $Rev$ $Date$ + */ +public class OperationSelectorJMSDefaultProcessor extends BaseStAXArtifactProcessor implements StAXArtifactProcessor<OperationSelectorJMSDefault> { + + public QName getArtifactType() { + return OperationSelectorJMSDefault.OPERATION_SELECTOR_JMS_DEFAULT_QNAME; + } + + public OperationSelectorJMSDefaultProcessor(ModelFactoryExtensionPoint modelFactories, Monitor monitor) { + } + + + public OperationSelectorJMSDefault read(XMLStreamReader reader) throws ContributionReadException, XMLStreamException { + OperationSelectorJMSDefault wireFormat = new OperationSelectorJMSDefault(); + + return wireFormat; + } + + public void write(OperationSelectorJMSDefault wireFormat, XMLStreamWriter writer) + throws ContributionWriteException, XMLStreamException { + String prefix = "tuscany"; + writer.writeStartElement(prefix, + getArtifactType().getLocalPart(), + getArtifactType().getNamespaceURI()); + writer.writeNamespace("tuscany", Constants.SCA10_TUSCANY_NS); + + writer.writeEndElement(); + } + + public Class<OperationSelectorJMSDefault> getModelType() { + return OperationSelectorJMSDefault.class; + } + + public void resolve(OperationSelectorJMSDefault arg0, ModelResolver arg1) throws ContributionResolveException { + + } + +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultReferenceInterceptor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultReferenceInterceptor.java new file mode 100644 index 0000000000..fbe4bd048d --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultReferenceInterceptor.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault; + +import java.util.List; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.NamingException; + +import org.apache.tuscany.sca.assembly.OperationSelector; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; +import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.BindingInterceptor; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.runtime.RuntimeWire; + +/** + * Policy handler to handle PolicySet related to Logging with the QName + * {http://tuscany.apache.org/xmlns/sca/1.0/impl/java}LoggingPolicy + * + * @version $Rev$ $Date$ + */ +public class OperationSelectorJMSDefaultReferenceInterceptor implements BindingInterceptor { + + private static final String ON_MESSAGE_METHOD_NAME = "onMessage"; + + private OperationSelectorJMSDefault operationSelector; + private RuntimeWire runtimeWire; + private JMSResourceFactory jmsResourceFactory; + private JMSBinding jmsBinding; + private JMSMessageProcessor requestMessageProcessor; + private JMSMessageProcessor responseMessageProcessor; + private RuntimeComponentService service; + private List<Operation> serviceOperations; + + + public OperationSelectorJMSDefaultReferenceInterceptor(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, RuntimeWire runtimeWire) { + super(); + this.jmsBinding = jmsBinding; + this.runtimeWire = runtimeWire; + this.jmsResourceFactory = jmsResourceFactory; + this.requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); + this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); + this.service = (RuntimeComponentService)runtimeWire.getTarget().getContract(); + this.serviceOperations = service.getInterfaceContract().getInterface().getOperations(); + } + + public Message invoke(Message msg) { + // TODO binding interceptor iface TBD + return null; + } + + public Message invokeRequest(Message msg) { + // TODO binding interceptor iface TBD + return null; + } + + public Message invokeResponse(Message msg) { + // TODO binding interceptor iface TBD + return null; + } +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultServiceInterceptor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultServiceInterceptor.java new file mode 100644 index 0000000000..8fdcc355aa --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/operationselector/jmsdefault/OperationSelectorJMSDefaultServiceInterceptor.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault; + +import java.util.List; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import javax.security.auth.Subject; + +import org.apache.tuscany.sca.assembly.OperationSelector; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; +import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.core.assembly.EndpointReferenceImpl; +import org.apache.tuscany.sca.core.invocation.MessageImpl; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.BindingInterceptor; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.policy.SecurityUtil; +import org.apache.tuscany.sca.policy.authentication.token.TokenPrincipal; +import org.apache.tuscany.sca.runtime.EndpointReference; +import org.apache.tuscany.sca.runtime.ReferenceParameters; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.runtime.RuntimeWire; + +/** + * Policy handler to handle PolicySet related to Logging with the QName + * {http://tuscany.apache.org/xmlns/sca/1.0/impl/java}LoggingPolicy + * + * @version $Rev$ $Date$ + */ +public class OperationSelectorJMSDefaultServiceInterceptor implements BindingInterceptor { + + private static final String ON_MESSAGE_METHOD_NAME = "onMessage"; + + private RuntimeWire runtimeWire; + private JMSResourceFactory jmsResourceFactory; + private JMSBinding jmsBinding; + private JMSMessageProcessor requestMessageProcessor; + private JMSMessageProcessor responseMessageProcessor; + private RuntimeComponentService service; + private List<Operation> serviceOperations; + + + public OperationSelectorJMSDefaultServiceInterceptor(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, RuntimeWire runtimeWire) { + super(); + this.jmsBinding = jmsBinding; + this.runtimeWire = runtimeWire; + this.jmsResourceFactory = jmsResourceFactory; + this.requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); + this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); + this.service = (RuntimeComponentService)runtimeWire.getTarget().getContract(); + this.serviceOperations = service.getInterfaceContract().getInterface().getOperations(); + } + + public Message invoke(Message msg) { + // TODO binding interceptor iface TBD + return null; + } + + public Message invokeRequest(Message msg) { + // get the jms message + javax.jms.Message jmsMsg = (javax.jms.Message)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_JMSREQUESTMSG_POSITION); + + String operationName = requestMessageProcessor.getOperationName(jmsMsg); + Operation operation = getTargetOperation(operationName); + msg.setOperation(operation); + + return msg; + } + + public Message invokeResponse(Message msg) { + try { + // get the jms message + javax.jms.Message requestJMSMsg = (javax.jms.Message)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_JMSREQUESTMSG_POSITION); + + // get the jms session + Session session = (Session)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_JMSSESSION_POSITION); + + Destination destination = requestJMSMsg.getJMSReplyTo(); + MessageProducer producer = session.createProducer(destination); + + producer.send((javax.jms.Message)msg.getBody()); + + producer.close(); + session.close(); + + return msg; + + } catch (JMSException e) { + throw new JMSBindingException(e); + } + } + + protected Operation getTargetOperation(String operationName) { + Operation operation = null; + + if (serviceOperations.size() == 1) { + + // SCA JMS Binding Specification - Rule 1.5.1 line 203 + operation = serviceOperations.get(0); + + } else if (operationName != null) { + + // SCA JMS Binding Specification - Rule 1.5.1 line 205 + for (Operation op : serviceOperations) { + if (op.getName().equals(operationName)) { + operation = op; + break; + } + } + + } else { + + // SCA JMS Binding Specification - Rule 1.5.1 line 207 + for (Operation op : serviceOperations) { + if (op.getName().equals(ON_MESSAGE_METHOD_NAME)) { + operation = op; + break; + } + } + } + + if (operation == null) { + throw new JMSBindingException("Can't find operation " + (operationName != null ? operationName : ON_MESSAGE_METHOD_NAME)); + } + + return operation; + } + +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java index f895397c5a..e32d298967 100644 --- a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java @@ -19,6 +19,7 @@ package org.apache.tuscany.sca.binding.jms.provider; +import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -26,6 +27,7 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; @@ -36,14 +38,21 @@ import org.apache.tuscany.sca.assembly.Binding; import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault.OperationSelectorJMSDefaultReferenceInterceptor; +import org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault.OperationSelectorJMSDefaultServiceInterceptor; +import org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault.WireFormatJMSDefaultReferenceInterceptor; +import org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault.WireFormatJMSDefaultServiceInterceptor; import org.apache.tuscany.sca.binding.ws.WebServiceBinding; import org.apache.tuscany.sca.binding.ws.WebServiceBindingFactory; import org.apache.tuscany.sca.binding.ws.wsdlgen.BindingWSDLGenerator; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.interfacedef.InterfaceContract; +import org.apache.tuscany.sca.invocation.Invoker; import org.apache.tuscany.sca.provider.ServiceBindingProvider; +import org.apache.tuscany.sca.provider.ServiceBindingProviderRRB; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.runtime.RuntimeWire; import org.apache.tuscany.sca.work.WorkScheduler; /** @@ -51,7 +60,7 @@ import org.apache.tuscany.sca.work.WorkScheduler; * * @version $Rev$ $Date$ */ -public class JMSBindingServiceBindingProvider implements ServiceBindingProvider { +public class JMSBindingServiceBindingProvider implements ServiceBindingProviderRRB { private static final Logger logger = Logger.getLogger(JMSBindingServiceBindingProvider.class.getName()); private RuntimeComponentService service; @@ -179,7 +188,20 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider consumer = session.createConsumer(destination); } - final DefaultJMSBindingListener listener = new DefaultJMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding); + MessageListener tmpListener = null; + + /* + * TODO a test to allow RRB experiments to take place without breaking everything else + * RRB stuff only happens if you add a wireFormat to a composite file + */ + if (jmsBinding.getWireFormat() != null ){ + tmpListener = new RRBJMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding); + } else { + tmpListener = new DefaultJMSBindingListener(jmsBinding, jmsResourceFactory, service, targetBinding); + } + + final MessageListener listener = tmpListener; + try { consumer.setMessageListener(listener); @@ -303,4 +325,21 @@ public class JMSBindingServiceBindingProvider implements ServiceBindingProvider throw new JMSBindingException(e); } } + + /* + * RRB test methods + * Interceptor selection is hard coded to the default here but of course should + * pick up the appropriate interceptor based on wireFormat and operationSelector + * elements in the SCDL + */ + public void configureServiceBindingRequestChain(List<Invoker> bindingRequestChain, RuntimeWire runtimeWire) { + + bindingRequestChain.add(new OperationSelectorJMSDefaultServiceInterceptor(jmsBinding, jmsResourceFactory, runtimeWire)); + bindingRequestChain.add(new WireFormatJMSDefaultServiceInterceptor(jmsBinding, jmsResourceFactory, runtimeWire)); + } + + public void configureServiceBindingResponseChain(List<Invoker> bindingResponseChain, RuntimeWire runtimeWire) { + bindingResponseChain.add(new WireFormatJMSDefaultServiceInterceptor(jmsBinding, jmsResourceFactory, runtimeWire)); + bindingResponseChain.add(new OperationSelectorJMSDefaultServiceInterceptor(jmsBinding, jmsResourceFactory, runtimeWire)); + } } diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingListener.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingListener.java new file mode 100644 index 0000000000..c61552299d --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/RRBJMSBindingListener.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.provider; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.NamingException; +import javax.security.auth.Subject; + +import org.apache.tuscany.sca.assembly.Binding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.binding.jms.policy.authentication.token.JMSTokenAuthenticationPolicy; +import org.apache.tuscany.sca.core.assembly.EndpointReferenceImpl; +import org.apache.tuscany.sca.core.invocation.MessageImpl; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.BindingInterceptor; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.policy.PolicySet; +import org.apache.tuscany.sca.policy.PolicySetAttachPoint; +import org.apache.tuscany.sca.policy.SecurityUtil; +import org.apache.tuscany.sca.policy.authentication.token.TokenPrincipal; +import org.apache.tuscany.sca.provider.ServiceBindingProviderRRB; +import org.apache.tuscany.sca.runtime.EndpointReference; +import org.apache.tuscany.sca.runtime.ReferenceParameters; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; + +/** + * TODO RRB experiement + * Listener for the JMSBinding. + * + * @version $Rev$ $Date$ + */ +public class RRBJMSBindingListener implements MessageListener { + + private static final Logger logger = Logger.getLogger(RRBJMSBindingListener.class.getName()); + + private static final String ON_MESSAGE_METHOD_NAME = "onMessage"; + private JMSBinding jmsBinding; + private Binding targetBinding; + private JMSResourceFactory jmsResourceFactory; + private RuntimeComponentService service; + private JMSMessageProcessor requestMessageProcessor; + private JMSMessageProcessor responseMessageProcessor; + private String correlationScheme; + private List<Operation> serviceOperations; + protected JMSTokenAuthenticationPolicy jmsTokenAuthenticationPolicy = null; + + /* + * TODO binding chains could be treated generically (RuntimeWire?) but are + * here just now for experimental convenience + */ + private List<Invoker> bindingRequestChain; + private List<Invoker> bindingResponseChain; + + public RRBJMSBindingListener(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, RuntimeComponentService service, Binding targetBinding) throws NamingException { + this.jmsBinding = jmsBinding; + this.jmsResourceFactory = jmsResourceFactory; + this.service = service; + this.targetBinding = targetBinding; + requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); + responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); + correlationScheme = jmsBinding.getCorrelationScheme(); + serviceOperations = service.getInterfaceContract().getInterface().getOperations(); + + // find out which policies are active + if (jmsBinding instanceof PolicySetAttachPoint) { + List<PolicySet> policySets = ((PolicySetAttachPoint)jmsBinding).getApplicablePolicySets(); + for (PolicySet ps : policySets) { + for (Object p : ps.getPolicies()) { + if (JMSTokenAuthenticationPolicy.class.isInstance(p)) { + jmsTokenAuthenticationPolicy = (JMSTokenAuthenticationPolicy)p; + }else { + // etc. check for other types of policy being present + } + } + } + } + + // Set up request/response chains for RRB + bindingRequestChain = new ArrayList<Invoker>(); + bindingResponseChain = new ArrayList<Invoker>(); + + ServiceBindingProviderRRB provider = (ServiceBindingProviderRRB)service.getBindingProvider(jmsBinding); + provider.configureServiceBindingRequestChain(bindingRequestChain, service.getRuntimeWire(targetBinding)); + provider.configureServiceBindingResponseChain(bindingResponseChain, service.getRuntimeWire(targetBinding)); + + } + + public void onMessage(Message requestJMSMsg) { + logger.log(Level.FINE, "JMS service '" + service.getName() + "' received message " + requestJMSMsg); + try { + invokeService(requestJMSMsg); + //sendReply(requestJMSMsg, responsePayload, false); + } catch (Throwable e) { + logger.log(Level.SEVERE, "Exception invoking service '" + service.getName(), e); + sendReply(requestJMSMsg, e, true); + } + } + + /** + * TODO RRB Experiment. Explicity call all the binding interceptors on the wire. Looking at + * how to handle requests and responses independently. The binding wire should/could + * be handled generically outside of this binding class but it's here for the momement + * while we look at what form it takes + * + * Turn the JMS message back into a Tuscany message and invoke the target component + * + * @param requestJMSMsg + * @return + * @throws JMSException + * @throws InvocationTargetException + */ + protected void invokeService(Message requestJMSMsg) throws JMSException, InvocationTargetException { + + // create the tuscany message + MessageImpl tuscanyMsg = new MessageImpl(); + + // populate the message context with JMS binding information + tuscanyMsg.getHeaders().add(requestJMSMsg); + + // call the request wire + for (Invoker invoker : bindingRequestChain){ + ((BindingInterceptor)invoker).invokeRequest(tuscanyMsg); + } + + // call the runtime wire + setHeaderProperties(requestJMSMsg, tuscanyMsg, tuscanyMsg.getOperation()); + Object response = service.getRuntimeWire(targetBinding).invoke(tuscanyMsg.getOperation(), tuscanyMsg); + + tuscanyMsg.setBody(response); + + if (requestJMSMsg.getJMSReplyTo() == null) { + // assume no reply is expected + if (response != null) { + logger.log(Level.FINE, "JMS service '" + service.getName() + "' dropped response as request has no replyTo"); + } + return; + } + + // call the response wire + for (Invoker invoker : bindingResponseChain){ + ((BindingInterceptor)invoker).invokeResponse(tuscanyMsg); + } + } + + /** + * TODO - RRB experiment. Needs refactoring + */ + protected void setHeaderProperties(javax.jms.Message requestJMSMsg, MessageImpl tuscanyMsg, Operation operation) throws JMSException { + + EndpointReference from = new EndpointReferenceImpl(null); + tuscanyMsg.setFrom(from); + from.setCallbackEndpoint(new EndpointReferenceImpl("/")); // TODO: whats this for? + ReferenceParameters parameters = from.getReferenceParameters(); + + String conversationID = requestJMSMsg.getStringProperty(JMSBindingConstants.CONVERSATION_ID_PROPERTY); + if (conversationID != null) { + parameters.setConversationID(conversationID); + } + + if (service.getInterfaceContract().getCallbackInterface() != null) { + + String callbackdestName = requestJMSMsg.getStringProperty(JMSBindingConstants.CALLBACK_Q_PROPERTY); + if (callbackdestName == null && operation.isNonBlocking()) { + // if the request has a replyTo but this service operation is oneway but the service uses callbacks + // then use the replyTo as the callback destination + Destination replyTo = requestJMSMsg.getJMSReplyTo(); + if (replyTo != null) { + callbackdestName = (replyTo instanceof Queue) ? ((Queue)replyTo).getQueueName() : ((Topic)replyTo).getTopicName(); + } + } + + if (callbackdestName != null) { + // append "jms:" to make it an absolute uri so the invoker can determine it came in on the request + // as otherwise the invoker should use the uri from the service callback binding + parameters.setCallbackReference(new EndpointReferenceImpl("jms:" + callbackdestName)); + } + + String callbackID = requestJMSMsg.getStringProperty(JMSBindingConstants.CALLBACK_ID_PROPERTY); + if (callbackID != null) { + parameters.setCallbackID(callbackID); + } + } + + + if (jmsTokenAuthenticationPolicy != null) { + String token = requestJMSMsg.getStringProperty(jmsTokenAuthenticationPolicy.getTokenName().toString()); + + Subject subject = SecurityUtil.getSubject(tuscanyMsg); + TokenPrincipal principal = SecurityUtil.getPrincipal(subject, TokenPrincipal.class); + + if (principal == null){ + principal = new TokenPrincipal(token); + subject.getPrincipals().add(principal); + } + + } + } + + /* + * TODO RRB experiment. Still used to handle errors. Needs refactoring + */ + protected void sendReply(Message requestJMSMsg, Object responsePayload, boolean isFault) { + try { + + if (requestJMSMsg.getJMSReplyTo() == null) { + // assume no reply is expected + if (responsePayload != null) { + logger.log(Level.FINE, "JMS service '" + service.getName() + "' dropped response as request has no replyTo"); + } + return; + } + + Session session = jmsResourceFactory.createSession(); + Message replyJMSMsg; + if (isFault) { + replyJMSMsg = responseMessageProcessor.createFaultMessage(session, (Throwable)responsePayload); + } else { + replyJMSMsg = responseMessageProcessor.insertPayloadIntoJMSMessage(session, responsePayload); + } + + replyJMSMsg.setJMSDeliveryMode(requestJMSMsg.getJMSDeliveryMode()); + replyJMSMsg.setJMSPriority(requestJMSMsg.getJMSPriority()); + + if (correlationScheme == null || JMSBindingConstants.CORRELATE_MSG_ID.equalsIgnoreCase(correlationScheme)) { + replyJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSMessageID()); + } else if (JMSBindingConstants.CORRELATE_CORRELATION_ID.equalsIgnoreCase(correlationScheme)) { + replyJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSCorrelationID()); + } + + Destination destination = requestJMSMsg.getJMSReplyTo(); + MessageProducer producer = session.createProducer(destination); + + producer.send(replyJMSMsg); + + producer.close(); + session.close(); + + } catch (JMSException e) { + throw new JMSBindingException(e); + } catch (NamingException e) { + throw new JMSBindingException(e); + } + } + +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytes.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytes.java new file mode 100644 index 0000000000..899bbbc13f --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytes.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.wireformat.bytes; + +import javax.xml.namespace.QName; + +import org.apache.tuscany.sca.assembly.xml.Constants; +import org.apache.tuscany.sca.assembly.WireFormat; + +/** + * Implementation for policies that could be injected as parameter + * into the axis2config. + * + * @version $Rev$ $Date$ + */ +public class WireFormatJMSBytes implements WireFormat { + public static final QName WIRE_FORMAT_JMS_BYTE_QNAME = new QName(Constants.SCA10_TUSCANY_NS, "wireFormat.JMSBytes"); + + public QName getSchemaName() { + return WIRE_FORMAT_JMS_BYTE_QNAME; + } + + public boolean isUnresolved() { + return false; + } + + public void setUnresolved(boolean unresolved) { + } +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesProcessor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesProcessor.java new file mode 100644 index 0000000000..62b87fd54f --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesProcessor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.wireformat.bytes; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +import org.apache.tuscany.sca.assembly.xml.Constants; +import org.apache.tuscany.sca.binding.jms.policy.header.JMSHeaderPolicy; +import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint; +import org.apache.tuscany.sca.contribution.processor.BaseStAXArtifactProcessor; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor; +import org.apache.tuscany.sca.contribution.resolver.ModelResolver; +import org.apache.tuscany.sca.contribution.service.ContributionReadException; +import org.apache.tuscany.sca.contribution.service.ContributionResolveException; +import org.apache.tuscany.sca.contribution.service.ContributionWriteException; +import org.apache.tuscany.sca.monitor.Monitor; + +/** + * + * @version $Rev$ $Date$ + */ +public class WireFormatJMSBytesProcessor extends BaseStAXArtifactProcessor implements StAXArtifactProcessor<WireFormatJMSBytes> { + + public QName getArtifactType() { + return WireFormatJMSBytes.WIRE_FORMAT_JMS_BYTE_QNAME; + } + + public WireFormatJMSBytesProcessor(ModelFactoryExtensionPoint modelFactories, Monitor monitor) { + } + + + public WireFormatJMSBytes read(XMLStreamReader reader) throws ContributionReadException, XMLStreamException { + WireFormatJMSBytes wireFormat = new WireFormatJMSBytes(); + + return wireFormat; + } + + public void write(WireFormatJMSBytes wireFormat, XMLStreamWriter writer) + throws ContributionWriteException, XMLStreamException { + String prefix = "tuscany"; + writer.writeStartElement(prefix, + getArtifactType().getLocalPart(), + getArtifactType().getNamespaceURI()); + writer.writeNamespace("tuscany", Constants.SCA10_TUSCANY_NS); + + writer.writeEndElement(); + } + + public Class<WireFormatJMSBytes> getModelType() { + return WireFormatJMSBytes.class; + } + + public void resolve(WireFormatJMSBytes arg0, ModelResolver arg1) throws ContributionResolveException { + + } + +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesProviderFactory.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesProviderFactory.java new file mode 100644 index 0000000000..f26ef4f546 --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesProviderFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.jms.wireformat.bytes; + +import org.apache.tuscany.sca.assembly.Binding; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.provider.WireFormatProvider; +import org.apache.tuscany.sca.provider.WireFormatProviderFactory; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; + +/** + * @version $Rev$ $Date$ + */ +public class WireFormatJMSBytesProviderFactory implements WireFormatProviderFactory<WireFormatJMSBytes> { + private ExtensionPointRegistry registry; + + public WireFormatJMSBytesProviderFactory(ExtensionPointRegistry registry) { + super(); + this.registry = registry; + } + + /** + */ + public WireFormatProvider createReferenceWireFormatProvider(RuntimeComponent component, + RuntimeComponentReference reference, + Binding binding) { + return new WireFormatJMSBytesReferenceProvider(component, reference, binding); + } + + /** + */ + public WireFormatProvider createServiceWireFormatProvider(RuntimeComponent component, + RuntimeComponentService service, + Binding binding) { + return new WireFormatJMSBytesServiceProvider(component, service, binding); + } + + /** + * @see org.apache.tuscany.sca.provider.ProviderFactory#getModelType() + */ + public Class getModelType() { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesReferenceInterceptor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesReferenceInterceptor.java new file mode 100644 index 0000000000..edc998bcd9 --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesReferenceInterceptor.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.wireformat.bytes; + + +import javax.jms.Session; + +import org.apache.tuscany.sca.assembly.WireFormat; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; +import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.RuntimeWire; + +/** + * + * @version $Rev$ $Date$ + */ +public class WireFormatJMSBytesReferenceInterceptor implements Interceptor { + + private Invoker next; + private RuntimeWire runtimeWire; + private JMSResourceFactory jmsResourceFactory; + private JMSBinding jmsBinding; + private JMSMessageProcessor requestMessageProcessor; + private JMSMessageProcessor responseMessageProcessor; + + public WireFormatJMSBytesReferenceInterceptor(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, RuntimeWire runtimeWire) { + super(); + this.jmsBinding = jmsBinding; + this.runtimeWire = runtimeWire; + this.jmsResourceFactory = jmsResourceFactory; + this.requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); + this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); + } + + public Message invoke(Message msg) { + + if (next != null){ + return getNext().invoke(msg); + } else { + return msg; + } + } + + public Invoker getNext() { + return next; + } + + public void setNext(Invoker next) { + this.next = next; + } +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesReferenceProvider.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesReferenceProvider.java new file mode 100644 index 0000000000..0aaca0c909 --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesReferenceProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.jms.wireformat.bytes; + +import java.util.List; + +import org.apache.tuscany.sca.assembly.Binding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Phase; +import org.apache.tuscany.sca.policy.PolicySet; +import org.apache.tuscany.sca.policy.PolicySetAttachPoint; +import org.apache.tuscany.sca.policy.util.PolicyHandler; +import org.apache.tuscany.sca.provider.PolicyProvider; +import org.apache.tuscany.sca.provider.WireFormatProvider; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; + +/** + * @version $Rev$ $Date$ + */ +public class WireFormatJMSBytesReferenceProvider implements WireFormatProvider { + private RuntimeComponent component; + private RuntimeComponentReference reference; + private Binding binding; + + public WireFormatJMSBytesReferenceProvider(RuntimeComponent component, + RuntimeComponentReference reference, + Binding binding) { + super(); + this.component = component; + this.reference = reference; + this.binding = binding; + } + + /** + * @see org.apache.tuscany.sca.provider.PolicyProvider#createInterceptor(org.apache.tuscany.sca.interfacedef.Operation) + */ + public Interceptor createInterceptor() { + return new WireFormatJMSBytesReferenceInterceptor((JMSBinding)binding, + null, + reference.getRuntimeWire(binding)); + } + + /** + * @see org.apache.tuscany.sca.provider.PolicyProvider#getPhase() + */ + public String getPhase() { + return Phase.REFERENCE_POLICY; + } + +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesServiceInterceptor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesServiceInterceptor.java new file mode 100644 index 0000000000..4bcc23edac --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesServiceInterceptor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.wireformat.bytes; + +import org.apache.tuscany.sca.assembly.WireFormat; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; +import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.RuntimeWire; + +/** + * Policy handler to handle PolicySet related to Logging with the QName + * {http://tuscany.apache.org/xmlns/sca/1.0/impl/java}LoggingPolicy + * + * @version $Rev$ $Date$ + */ +public class WireFormatJMSBytesServiceInterceptor implements Interceptor { + private Invoker next; + private RuntimeWire runtimeWire; + private JMSResourceFactory jmsResourceFactory; + private JMSBinding jmsBinding; + private JMSMessageProcessor requestMessageProcessor; + private JMSMessageProcessor responseMessageProcessor; + + public WireFormatJMSBytesServiceInterceptor(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, RuntimeWire runtimeWire) { + super(); + this.jmsBinding = jmsBinding; + this.runtimeWire = runtimeWire; + this.jmsResourceFactory = jmsResourceFactory; + this.requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); + this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); + } + + + public Message invoke(Message msg) { + // get the jms message + javax.jms.Message jmsMsg = (javax.jms.Message)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_JMSREQUESTMSG_POSITION); + + msg.setBody(requestMessageProcessor.extractPayloadFromJMSMessage(jmsMsg)); + + if (next != null){ + return getNext().invoke(msg); + } else { + return msg; + } + + } + + public Invoker getNext() { + return next; + } + + public void setNext(Invoker next) { + this.next = next; + } +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesServiceProvider.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesServiceProvider.java new file mode 100644 index 0000000000..059b0e52cb --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/bytes/WireFormatJMSBytesServiceProvider.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.jms.wireformat.bytes; + +import java.util.List; + +import org.apache.tuscany.sca.assembly.Binding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Phase; +import org.apache.tuscany.sca.policy.PolicySet; +import org.apache.tuscany.sca.policy.PolicySetAttachPoint; +import org.apache.tuscany.sca.policy.util.PolicyHandler; +import org.apache.tuscany.sca.provider.PolicyProvider; +import org.apache.tuscany.sca.provider.WireFormatProvider; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; + +/** + * @version $Rev$ $Date$ + */ +public class WireFormatJMSBytesServiceProvider implements WireFormatProvider { + private RuntimeComponent component; + private RuntimeComponentService service; + private Binding binding; + + public WireFormatJMSBytesServiceProvider(RuntimeComponent component, RuntimeComponentService service, Binding binding) { + super(); + this.component = component; + this.service = service; + this.binding = binding; + } + + /** + */ + public Interceptor createInterceptor() { + return new WireFormatJMSBytesServiceInterceptor((JMSBinding)binding, + null, + service.getRuntimeWire(binding)); + } + + /** + */ + public String getPhase() { + return Phase.SERVICE_POLICY; + } + +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefault.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefault.java new file mode 100644 index 0000000000..8cce4b0d10 --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefault.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault; + +import javax.xml.namespace.QName; + +import org.apache.tuscany.sca.assembly.xml.Constants; +import org.apache.tuscany.sca.assembly.WireFormat; + +/** + * + * @version $Rev$ $Date$ + */ +public class WireFormatJMSDefault implements WireFormat { + public static final QName WIRE_FORMAT_JMS_DEFAULT_QNAME = new QName(Constants.SCA10_TUSCANY_NS, "wireFormat.JMSDefault"); + + public QName getSchemaName() { + return WIRE_FORMAT_JMS_DEFAULT_QNAME; + } + + public boolean isUnresolved() { + return false; + } + + public void setUnresolved(boolean unresolved) { + } +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultProcessor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultProcessor.java new file mode 100644 index 0000000000..4ffcd22f46 --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultProcessor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +import org.apache.tuscany.sca.assembly.xml.Constants; +import org.apache.tuscany.sca.binding.jms.policy.header.JMSHeaderPolicy; +import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint; +import org.apache.tuscany.sca.contribution.processor.BaseStAXArtifactProcessor; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor; +import org.apache.tuscany.sca.contribution.resolver.ModelResolver; +import org.apache.tuscany.sca.contribution.service.ContributionReadException; +import org.apache.tuscany.sca.contribution.service.ContributionResolveException; +import org.apache.tuscany.sca.contribution.service.ContributionWriteException; +import org.apache.tuscany.sca.monitor.Monitor; + +/** + * + * @version $Rev$ $Date$ + */ +public class WireFormatJMSDefaultProcessor extends BaseStAXArtifactProcessor implements StAXArtifactProcessor<WireFormatJMSDefault> { + + public QName getArtifactType() { + return WireFormatJMSDefault.WIRE_FORMAT_JMS_DEFAULT_QNAME; + } + + public WireFormatJMSDefaultProcessor(ModelFactoryExtensionPoint modelFactories, Monitor monitor) { + } + + + public WireFormatJMSDefault read(XMLStreamReader reader) throws ContributionReadException, XMLStreamException { + WireFormatJMSDefault wireFormat = new WireFormatJMSDefault(); + + return wireFormat; + } + + public void write(WireFormatJMSDefault wireFormat, XMLStreamWriter writer) + throws ContributionWriteException, XMLStreamException { + String prefix = "tuscany"; + writer.writeStartElement(prefix, + getArtifactType().getLocalPart(), + getArtifactType().getNamespaceURI()); + writer.writeNamespace("tuscany", Constants.SCA10_TUSCANY_NS); + + writer.writeEndElement(); + } + + public Class<WireFormatJMSDefault> getModelType() { + return WireFormatJMSDefault.class; + } + + public void resolve(WireFormatJMSDefault arg0, ModelResolver arg1) throws ContributionResolveException { + + } + +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultReferenceInterceptor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultReferenceInterceptor.java new file mode 100644 index 0000000000..e17c9a8c9b --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultReferenceInterceptor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault; + + +import javax.jms.JMSException; +import javax.jms.Session; +import javax.naming.NamingException; + +import org.apache.tuscany.sca.assembly.WireFormat; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; +import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.invocation.BindingInterceptor; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.RuntimeWire; + +/** + * + * @version $Rev$ $Date$ + */ +public class WireFormatJMSDefaultReferenceInterceptor implements BindingInterceptor { + + private Invoker next; + private RuntimeWire runtimeWire; + private JMSResourceFactory jmsResourceFactory; + private JMSBinding jmsBinding; + private JMSMessageProcessor requestMessageProcessor; + private JMSMessageProcessor responseMessageProcessor; + private String correlationScheme; + + public WireFormatJMSDefaultReferenceInterceptor(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, RuntimeWire runtimeWire) { + super(); + this.jmsBinding = jmsBinding; + this.runtimeWire = runtimeWire; + this.jmsResourceFactory = jmsResourceFactory; + this.requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); + this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); + this.correlationScheme = jmsBinding.getCorrelationScheme(); + } + + public Message invoke(Message msg) { + // TODO binding interceptor iface TBD + return null; + } + + public Message invokeRequest(Message msg) { + // TODO binding interceptor iface TBD + return null; + } + + public Message invokeResponse(Message msg) { + // TODO binding interceptor iface TBD + return null; + } + + public Invoker getNext() { + return next; + } + + public void setNext(Invoker next) { + this.next = next; + } +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultServiceInterceptor.java b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultServiceInterceptor.java new file mode 100644 index 0000000000..5c53bcf009 --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wireformat/jmsdefault/WireFormatJMSDefaultServiceInterceptor.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault; + +import javax.jms.JMSException; +import javax.jms.Session; +import javax.naming.NamingException; + +import org.apache.tuscany.sca.assembly.WireFormat; +import org.apache.tuscany.sca.binding.jms.impl.JMSBinding; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingConstants; +import org.apache.tuscany.sca.binding.jms.impl.JMSBindingException; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor; +import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil; +import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory; +import org.apache.tuscany.sca.invocation.BindingInterceptor; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.runtime.RuntimeWire; + +/** + * Policy handler to handle PolicySet related to Logging with the QName + * {http://tuscany.apache.org/xmlns/sca/1.0/impl/java}LoggingPolicy + * + * @version $Rev$ $Date$ + */ +public class WireFormatJMSDefaultServiceInterceptor implements BindingInterceptor { + private Invoker next; + private RuntimeWire runtimeWire; + private JMSResourceFactory jmsResourceFactory; + private JMSBinding jmsBinding; + private JMSMessageProcessor requestMessageProcessor; + private JMSMessageProcessor responseMessageProcessor; + private String correlationScheme; + + public WireFormatJMSDefaultServiceInterceptor(JMSBinding jmsBinding, JMSResourceFactory jmsResourceFactory, RuntimeWire runtimeWire) { + super(); + this.jmsBinding = jmsBinding; + this.runtimeWire = runtimeWire; + this.jmsResourceFactory = jmsResourceFactory; + this.requestMessageProcessor = JMSMessageProcessorUtil.getRequestMessageProcessor(jmsBinding); + this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(jmsBinding); + this.correlationScheme = jmsBinding.getCorrelationScheme(); + } + + public Message invoke(Message msg) { + // TODO binding interceptor iface TBD + return null; + } + + public Message invokeRequest(Message msg) { + // get the jms message + javax.jms.Message jmsMsg = (javax.jms.Message)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_JMSREQUESTMSG_POSITION); + + if ("onMessage".equals(msg.getOperation().getName())) { + msg.setBody(new Object[]{jmsMsg}); + } else { + Object requestPayload = requestMessageProcessor.extractPayloadFromJMSMessage(jmsMsg); + msg.setBody(requestPayload); + } + + if (next != null){ + return getNext().invoke(msg); + } else { + return msg; + } + } + + public Message invokeResponse(Message msg) { + try { + // get the jms message + javax.jms.Message requestJMSMsg = (javax.jms.Message)msg.getHeaders().get(JMSBindingConstants.MSG_CTXT_JMSREQUESTMSG_POSITION); + + Session session = jmsResourceFactory.createSession(); + msg.getHeaders().add(JMSBindingConstants.MSG_CTXT_JMSSESSION_POSITION, session); + + javax.jms.Message responseJMSMsg; + if (msg.isFault()) { + responseJMSMsg = responseMessageProcessor.createFaultMessage(session, (Throwable)msg.getBody()); + } else { + responseJMSMsg = responseMessageProcessor.insertPayloadIntoJMSMessage(session, msg.getBody()); + } + + responseJMSMsg.setJMSDeliveryMode(requestJMSMsg.getJMSDeliveryMode()); + responseJMSMsg.setJMSPriority(requestJMSMsg.getJMSPriority()); + + if (correlationScheme == null || JMSBindingConstants.CORRELATE_MSG_ID.equalsIgnoreCase(correlationScheme)) { + responseJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSMessageID()); + } else if (JMSBindingConstants.CORRELATE_CORRELATION_ID.equalsIgnoreCase(correlationScheme)) { + responseJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSCorrelationID()); + } + + msg.setBody(responseJMSMsg); + + if (next != null){ + return getNext().invoke(msg); + } else { + return msg; + } + } catch (JMSException e) { + throw new JMSBindingException(e); + } catch (NamingException e) { + throw new JMSBindingException(e); + } + } + + public Invoker getNext() { + return next; + } + + public void setNext(Invoker next) { + this.next = next; + } +} diff --git a/java/sca/modules/binding-jms-runtime/src/main/resources/META-INF/services/org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor b/java/sca/modules/binding-jms-runtime/src/main/resources/META-INF/services/org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor new file mode 100644 index 0000000000..6a4fcca25f --- /dev/null +++ b/java/sca/modules/binding-jms-runtime/src/main/resources/META-INF/services/org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Implementation class for the artifact processor extension
+org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault.WireFormatJMSDefaultProcessor;qname=http://www.osoa.org/xmlns/sca/1.0#wireFormat.jmsDefault,model=org.apache.tuscany.sca.binding.jms.wireformat.jmsdefault.WireFormatJMSDefault
+org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault.OperationSelectorJMSDefaultProcessor;qname=http://www.osoa.org/xmlns/sca/1.0#operationSelector.jmsDefault,model=org.apache.tuscany.sca.binding.jms.operationselector.jmsdefault.OperationSelectorJMSDefault
|