diff options
author | dims <dims@13f79535-47bb-0310-9956-ffa450edef68> | 2008-06-17 00:23:01 +0000 |
---|---|---|
committer | dims <dims@13f79535-47bb-0310-9956-ffa450edef68> | 2008-06-17 00:23:01 +0000 |
commit | bdd0a41aed7edf21ec2a65cfa17a86af2ef8c48a (patch) | |
tree | 38a92061c0793434c4be189f1d70c3458b6bc41d /branches/sca-java-1.2.1/modules/binding-notification/src |
Move Tuscany from Incubator to top level.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@668359 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'branches/sca-java-1.2.1/modules/binding-notification/src')
85 files changed, 6685 insertions, 0 deletions
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/DefaultNotificationBindingFactory.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/DefaultNotificationBindingFactory.java new file mode 100644 index 0000000000..e320357d93 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/DefaultNotificationBindingFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +/** + * Creates runtime artifacts for the notification binding + * + * @version $Rev$ $Date$ + */ +public class DefaultNotificationBindingFactory implements NotificationBindingFactory { + + public NotificationBinding createNotificationBinding() { + return new NotificationBindingImpl(); + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBinding.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBinding.java new file mode 100644 index 0000000000..f09798859b --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBinding.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +import java.net.URI; + +import org.apache.tuscany.sca.assembly.Binding; + +/** + * @version $Rev$ $Date$ + */ +public interface NotificationBinding extends Binding { + + String getNtmAddress(); + + void setNtmAddress(String ntm); + + URI getNotificationType(); + + void setNotificationType(URI notificationType); +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingFactory.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingFactory.java new file mode 100644 index 0000000000..2749f5f958 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingFactory.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +/** + * @version $Rev$ $Date$ + */ +public interface NotificationBindingFactory { + + NotificationBinding createNotificationBinding(); +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingImpl.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingImpl.java new file mode 100644 index 0000000000..992cbb6c2d --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingImpl.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +import java.net.URI; + +/** + * @version $Rev$ $Date$ + */ +public class NotificationBindingImpl implements NotificationBinding { + private String name; + private String uri; + protected String ntmAddress; + protected URI notificationType; + + @Override + public Object clone() { + return this; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getURI() { + return uri; + } + + public void setURI(String uri) { + this.uri = uri; + } + + public String getNtmAddress() { + return ntmAddress; + } + + public void setNtmAddress(String ntmAddress) { + this.ntmAddress = ntmAddress; + } + + public URI getNotificationType() { + return notificationType; + } + + public void setNotificationType(URI notificationType) { + this.notificationType = notificationType; + } + + public void setUnresolved(boolean unresolved) { + } + + public boolean isUnresolved() { + return false; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingModuleActivator.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingModuleActivator.java new file mode 100644 index 0000000000..2cefaa8bba --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingModuleActivator.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +import org.apache.tuscany.sca.assembly.AssemblyFactory; +import org.apache.tuscany.sca.assembly.DefaultAssemblyFactory; +import org.apache.tuscany.sca.binding.notification.encoding.DefaultEncodingRegistry; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.ModuleActivator; +import org.apache.tuscany.sca.host.http.ExtensibleServletHost; +import org.apache.tuscany.sca.host.http.ServletHost; +import org.apache.tuscany.sca.host.http.ServletHostExtensionPoint; +import org.apache.tuscany.sca.policy.DefaultPolicyFactory; +import org.apache.tuscany.sca.policy.PolicyFactory; +import org.apache.tuscany.sca.provider.ProviderFactoryExtensionPoint; + +/** + * @version $Rev$ $Date$ + */ +public class NotificationBindingModuleActivator implements ModuleActivator { + + private NotificationBindingProcessor bindingProcessor; + + private DefaultEncodingRegistry encodingRegistry; + private ServletHost servletHost; + + + public void start(ExtensionPointRegistry registry) { + encodingRegistry = new DefaultEncodingRegistry(); + servletHost = new ExtensibleServletHost(registry.getExtensionPoint(ServletHostExtensionPoint.class)); + + AssemblyFactory assemblyFactory = new DefaultAssemblyFactory(); + PolicyFactory policyFactory = new DefaultPolicyFactory(); + DefaultNotificationBindingFactory bindingFactory = new DefaultNotificationBindingFactory(); + bindingProcessor = new NotificationBindingProcessor(assemblyFactory, policyFactory, bindingFactory); + StAXArtifactProcessorExtensionPoint processors = registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class); + processors.addArtifactProcessor(bindingProcessor); + + NotificationBindingProviderFactory nbpf = new NotificationBindingProviderFactory(servletHost, + encodingRegistry); + ProviderFactoryExtensionPoint providerFactories = registry.getExtensionPoint(ProviderFactoryExtensionPoint.class); + providerFactories.addProviderFactory(nbpf); + } + + public void stop(ExtensionPointRegistry registry) { + encodingRegistry.stop(); + StAXArtifactProcessorExtensionPoint processors = registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class); + processors.removeArtifactProcessor(bindingProcessor); + } + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProcessor.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProcessor.java new file mode 100644 index 0000000000..0c9e1df1bc --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProcessor.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +import java.net.URI; +import java.net.URISyntaxException; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +import org.apache.tuscany.sca.assembly.AssemblyFactory; +import org.apache.tuscany.sca.assembly.xml.Constants; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor; +import org.apache.tuscany.sca.contribution.resolver.ModelResolver; +import org.apache.tuscany.sca.contribution.service.ContributionReadException; +import org.apache.tuscany.sca.contribution.service.ContributionResolveException; +import org.apache.tuscany.sca.contribution.service.ContributionWriteException; +import org.apache.tuscany.sca.policy.PolicyFactory; + +/** + * @version $Rev$ $Date$ + */ +public class NotificationBindingProcessor implements StAXArtifactProcessor<NotificationBinding> { + + protected static final QName BINDING_NOTIFICATION = new QName(Constants.SCA10_TUSCANY_NS, "binding.notification"); + + private NotificationBindingFactory bindingFactory; + + public NotificationBindingProcessor(AssemblyFactory assemblyFactory, + PolicyFactory policyFactory, + NotificationBindingFactory bindingFactory) { + this.bindingFactory = bindingFactory; + } + + public QName getArtifactType() { + return BINDING_NOTIFICATION; + } + + public Class<NotificationBinding> getModelType() { + return NotificationBinding.class; + } + + public NotificationBinding read(XMLStreamReader reader) throws ContributionReadException, XMLStreamException { + assert BINDING_NOTIFICATION.equals(reader.getName()); + String bindingUri = reader.getAttributeValue(null, "uri"); + String name = reader.getAttributeValue(null, "name"); + String ntm = reader.getAttributeValue(null, "ntm"); + String notificationType = reader.getAttributeValue(null, "notificationType"); + + NotificationBinding binding = bindingFactory.createNotificationBinding(); + if (name != null) { + binding.setName(name); + } + if (bindingUri != null) { + binding.setURI(bindingUri); + } + if (ntm != null) { + binding.setNtmAddress(ntm); + } + if (notificationType != null) { + try { + binding.setNotificationType(new URI(notificationType)); + } catch(URISyntaxException e) { + throw new ContributionReadException(e); + } + } + return binding; + } + + public void write(NotificationBinding notificationBinding, XMLStreamWriter writer) + throws ContributionWriteException, XMLStreamException { + + //FIXME Implement this method + } + + public void resolve(NotificationBinding notificationBinding, ModelResolver resolver) throws ContributionResolveException { + } + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProviderFactory.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProviderFactory.java new file mode 100644 index 0000000000..7592f4f824 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProviderFactory.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +import java.net.InetAddress; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.tuscany.sca.binding.notification.encoding.AbstractEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerConsumerReferenceEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerID; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerIDEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerProducerReferenceEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.BrokersEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverrideEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverrideResponseEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.ConsumerReferenceEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry; +import org.apache.tuscany.sca.binding.notification.encoding.EndConsumersEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.EndProducersEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.EndpointAddressEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference; +import org.apache.tuscany.sca.binding.notification.encoding.EndpointReferenceEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.NeighborBrokerConsumersEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.NeighborsEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerAckEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerResponseEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.NewConsumerEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.NewConsumerResponseEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.NewProducerEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.NewProducerResponseEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.ReferencePropertiesEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.RemoveBrokerEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.RemovedBrokerEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.ReplaceBrokerConnectionEnDeCoder; +import org.apache.tuscany.sca.binding.notification.encoding.SubscribeEnDeCoder; +import org.apache.tuscany.sca.host.http.ServletHost; +import org.apache.tuscany.sca.provider.BindingProviderFactory; +import org.apache.tuscany.sca.provider.ReferenceBindingProvider; +import org.apache.tuscany.sca.provider.ServiceBindingProvider; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; + +/** + * The runtime representation of the notification reference binding + * + * @version $Rev$ $Date$ + */ +public class NotificationBindingProviderFactory implements BindingProviderFactory<NotificationBinding>, + NotificationBrokerManager { + private static final String DEFAULT_PORT = "8083"; + + private ServletHost servletHost; + private NotificationTypeManagerImpl notificationTypeManager; + private EncodingRegistry encodingRegistry; + private String httpUrl; + private Map<URI, NotificationReferenceBindingProvider> referenceBindingProviders; + private Map<URI, NotificationServiceBindingProvider> serviceBindingProviders; + + private static NotificationBindingProviderFactory factoryInstance = null; + + public NotificationBindingProviderFactory(ServletHost servletHost, EncodingRegistry encodingRegistry) { + this.servletHost = servletHost; + this.encodingRegistry = encodingRegistry; + this.referenceBindingProviders = new HashMap<URI, NotificationReferenceBindingProvider>(); + this.serviceBindingProviders = new HashMap<URI, NotificationServiceBindingProvider>(); + + factoryInstance = this; + } + + public Class<NotificationBinding> getModelType() { + return NotificationBinding.class; + } + + public ReferenceBindingProvider createReferenceBindingProvider(RuntimeComponent component, + RuntimeComponentReference reference, + NotificationBinding binding) { + init(); + URI notificationType = binding.getNotificationType(); + if (!validReferenceBinding(binding)) { + throw new RuntimeException("Binding not valid"); + } + NotificationReferenceBindingProvider referenceBindingProvider = referenceBindingProviders.get(notificationType); + if (referenceBindingProvider == null) { + referenceBindingProvider = + new NotificationReferenceBindingProvider(binding, component, reference, servletHost, + notificationTypeManager, encodingRegistry, httpUrl, this); + referenceBindingProviders.put(notificationType, referenceBindingProvider); + } + return referenceBindingProvider; + } + + public ServiceBindingProvider createServiceBindingProvider(RuntimeComponent component, + RuntimeComponentService service, + NotificationBinding binding) { + init(); + URI notificationType = binding.getNotificationType(); + if (!validServiceBinding(binding)) { + throw new RuntimeException("Binding not valid"); + } + NotificationServiceBindingProvider serviceBindingProvider = serviceBindingProviders.get(notificationType); + if (serviceBindingProvider == null) { + serviceBindingProvider = + new NotificationServiceBindingProvider(binding, component, service, servletHost, + notificationTypeManager, encodingRegistry, httpUrl, this); + serviceBindingProviders.put(notificationType, serviceBindingProvider); + } + return serviceBindingProvider; + } + + private boolean validServiceBinding(NotificationBinding binding) { + URI notificationType = binding.getNotificationType(); + NotificationReferenceBindingProvider referenceBindingProvider = referenceBindingProviders.get(notificationType); + if (referenceBindingProvider != null) { + return validBinding(binding, referenceBindingProvider.getBinding()); + } + return true; + } + + private boolean validReferenceBinding(NotificationBinding binding) { + URI notificationType = binding.getNotificationType(); + NotificationServiceBindingProvider serviceBindingProvider = serviceBindingProviders.get(notificationType); + if (serviceBindingProvider != null) { + return validBinding(binding, serviceBindingProvider.getBinding()); + } + return true; + } + + private boolean validBinding(NotificationBinding binding1, NotificationBinding binding2) { + String binding1NtmAddress = binding1.getNtmAddress(); + String binding2NtmAddress = binding2.getNtmAddress(); + if (binding1NtmAddress == null && binding2NtmAddress == null) { + return true; + } else if (binding1NtmAddress == null || binding2NtmAddress == null) { + return false; + } else { + return binding1NtmAddress.equals(binding2NtmAddress); + } + } + + /* + * These methods are intended to be called by the binding providers' start + * methods. By the time this happens, both referenceBindingProvider != null && + * serviceBindingProvider != null, if they are ever going to be + */ + public void serviceProviderStarted(URI notificationType, + NotificationServiceBindingProvider serviceBindingProvider, + URL remoteNtmUrl) { + NotificationReferenceBindingProvider referenceBindingProvider = referenceBindingProviders.get(notificationType); + if (referenceBindingProvider == null) { + serviceBindingProvider.deployConsumer(); + } else if (referenceBindingProvider.isStarted()) { + String brokerID = BrokerID.generate(); + deployBroker(notificationType, serviceBindingProvider, referenceBindingProvider, brokerID, remoteNtmUrl); + } + } + + public void referenceProviderStarted(URI notificationType, + NotificationReferenceBindingProvider referenceBindingProvider, + URL remoteNtmUrl) { + NotificationServiceBindingProvider serviceBindingProvider = serviceBindingProviders.get(notificationType); + if (serviceBindingProvider == null) { + referenceBindingProvider.deployProducer(); + } else if (serviceBindingProvider.isStarted()) { + String brokerID = BrokerID.generate(); + deployBroker(notificationType, serviceBindingProvider, referenceBindingProvider, brokerID, remoteNtmUrl); + } + } + + private void deployBroker(URI notificationType, + NotificationServiceBindingProvider serviceBindingProvider, + NotificationReferenceBindingProvider referenceBindingProvider, + String brokerID, + URL remoteNtmUrl) { + URL consumerUrl = serviceBindingProvider.getURL(); + URL producerUrl = referenceBindingProvider.getURL(); + List<EndpointReference> consumerList = new ArrayList<EndpointReference>(); + List<EndpointReference> producerList = new ArrayList<EndpointReference>(); + boolean firstBroker = + notificationTypeManager.newBroker(notificationType, + consumerUrl, + producerUrl, + brokerID, + remoteNtmUrl, + consumerList, + producerList); + if (firstBroker) { + serviceBindingProvider.deployBroker(brokerID, null, producerList); + referenceBindingProvider.deployBroker(brokerID, null, consumerList); + if (!consumerList.isEmpty() || !producerList.isEmpty()) { + notificationTypeManager.newBrokerAck(remoteNtmUrl); + } + } else { + // returned lists contain broker consumers and producers and are the + // same length + int index = consumerList.size() - 1; + // establish connection with picked broker + EndpointReference brokerConsumerEPR = consumerList.get(index); + EndpointReference brokerProducerEPR = producerList.get(index); + serviceBindingProvider.deployBroker(brokerID, brokerProducerEPR, null); + referenceBindingProvider.deployBroker(brokerID, brokerConsumerEPR, null); + } + } + + public void replaceConsumersBrokerConnection(URI notificationType, EndpointReference chosenBrokerProducerEpr) { + NotificationServiceBindingProvider serviceBindingProvider = serviceBindingProviders.get(notificationType); + if (serviceBindingProvider == null) { + throw new RuntimeException("Missing service binding provider for [" + notificationType + "]"); + } + serviceBindingProvider.replaceBrokerConnection(chosenBrokerProducerEpr); + } + + public static void removeBroker(URI notificationType) { + if (factoryInstance == null) { + throw new RuntimeException("Missing factory instance"); + } + NotificationReferenceBindingProvider referenceBindingProvider = + factoryInstance.referenceBindingProviders.get(notificationType); + NotificationServiceBindingProvider serviceBindingProvider = + factoryInstance.serviceBindingProviders.get(notificationType); + if (referenceBindingProvider == null || serviceBindingProvider == null) { + throw new RuntimeException("Not a broker for [" + notificationType + "]"); + } + referenceBindingProvider.undeployBroker(serviceBindingProvider.getURL()); + } + + private String getBaseURI() { + if (httpUrl == null) { + String httpPort = System.getProperty("notification.httpPort"); + if (httpPort == null) { + httpPort = DEFAULT_PORT; + } + String localHost = null; + try { + localHost = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (Exception e) { + e.printStackTrace(); + localHost = "localhost"; + } + httpUrl = "http://" + localHost + ((httpPort != null) ? (":" + httpPort) : ""); + } + return httpUrl; + } + + @SuppressWarnings("unchecked") + public void init() { + if (notificationTypeManager == null) { + AbstractEnDeCoder subscribeEnDeCoder = new SubscribeEnDeCoder(encodingRegistry); + subscribeEnDeCoder.start(); + AbstractEnDeCoder consumerReferenceEnDeCoder = new ConsumerReferenceEnDeCoder(encodingRegistry); + consumerReferenceEnDeCoder.start(); + AbstractEnDeCoder endpointAddressEnDeCoder = new EndpointAddressEnDeCoder(encodingRegistry); + endpointAddressEnDeCoder.start(); + AbstractEnDeCoder newConsumerEnDeCoder = new NewConsumerEnDeCoder(encodingRegistry); + newConsumerEnDeCoder.start(); + AbstractEnDeCoder newProducerEnDeCoder = new NewProducerEnDeCoder(encodingRegistry); + newProducerEnDeCoder.start(); + AbstractEnDeCoder newConsumerResponseEnDeCoder = new NewConsumerResponseEnDeCoder(encodingRegistry); + newConsumerResponseEnDeCoder.start(); + AbstractEnDeCoder newProducerResponseEnDeCoder = new NewProducerResponseEnDeCoder(encodingRegistry); + newProducerResponseEnDeCoder.start(); + AbstractEnDeCoder newBrokerEnDeCoder = new NewBrokerEnDeCoder(encodingRegistry); + newBrokerEnDeCoder.start(); + AbstractEnDeCoder brokerConsumerReferenceEnDeCoder = new BrokerConsumerReferenceEnDeCoder(encodingRegistry); + brokerConsumerReferenceEnDeCoder.start(); + AbstractEnDeCoder brokerProducerReferenceEnDeCoder = new BrokerProducerReferenceEnDeCoder(encodingRegistry); + brokerProducerReferenceEnDeCoder.start(); + AbstractEnDeCoder newBrokerResponseEnDeCoder = new NewBrokerResponseEnDeCoder(encodingRegistry); + newBrokerResponseEnDeCoder.start(); + AbstractEnDeCoder brokersEnDeCoder = new BrokersEnDeCoder(encodingRegistry); + brokersEnDeCoder.start(); + AbstractEnDeCoder brokerEnDeCoder = new BrokerEnDeCoder(encodingRegistry); + brokerEnDeCoder.start(); + AbstractEnDeCoder endConsumersEnDeCoder = new EndConsumersEnDeCoder(encodingRegistry); + endConsumersEnDeCoder.start(); + AbstractEnDeCoder endProducersEnDeCoder = new EndProducersEnDeCoder(encodingRegistry); + endProducersEnDeCoder.start(); + AbstractEnDeCoder endpointReferenceEnDeCoder = new EndpointReferenceEnDeCoder(encodingRegistry); + endpointReferenceEnDeCoder.start(); + AbstractEnDeCoder referencePropertiesEnDeCoder = new ReferencePropertiesEnDeCoder(encodingRegistry); + referencePropertiesEnDeCoder.start(); + AbstractEnDeCoder brokerIDEnDeCoder = new BrokerIDEnDeCoder(encodingRegistry); + brokerIDEnDeCoder.start(); + AbstractEnDeCoder connectionOverrideEnDeCoder = new ConnectionOverrideEnDeCoder(encodingRegistry); + connectionOverrideEnDeCoder.start(); + AbstractEnDeCoder connectionOverrideResponseEnDeCoder = + new ConnectionOverrideResponseEnDeCoder(encodingRegistry); + connectionOverrideResponseEnDeCoder.start(); + AbstractEnDeCoder newBrokerAckEnDeCoder = new NewBrokerAckEnDeCoder(encodingRegistry); + newBrokerAckEnDeCoder.start(); + AbstractEnDeCoder neighborBrokerConsumersEnDeCoder = new NeighborBrokerConsumersEnDeCoder(encodingRegistry); + neighborBrokerConsumersEnDeCoder.start(); + AbstractEnDeCoder removeBrokerEnDeCoder = new RemoveBrokerEnDeCoder(encodingRegistry); + removeBrokerEnDeCoder.start(); + AbstractEnDeCoder removedBrokerEnDeCoder = new RemovedBrokerEnDeCoder(encodingRegistry); + removedBrokerEnDeCoder.start(); + AbstractEnDeCoder neighborsEnDeCoder = new NeighborsEnDeCoder(encodingRegistry); + neighborsEnDeCoder.start(); + AbstractEnDeCoder replaceBrokerConnectionEnDeCoder = new ReplaceBrokerConnectionEnDeCoder(encodingRegistry); + replaceBrokerConnectionEnDeCoder.start(); + + notificationTypeManager = new NotificationTypeManagerImpl(); + notificationTypeManager.setServletHost(servletHost); + notificationTypeManager.setEncodingRegistry(encodingRegistry); + notificationTypeManager.init(); + + getBaseURI(); + } + } + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBrokerManager.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBrokerManager.java new file mode 100644 index 0000000000..f088025a23 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBrokerManager.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +import java.net.URI; +import java.net.URL; + +import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference; + +/** + * @version $Rev$ $Date$ + */ +public interface NotificationBrokerManager { + + void serviceProviderStarted(URI notificationType, + NotificationServiceBindingProvider serviceBindingProvider, + URL remoteNtmUrl); + + void referenceProviderStarted(URI notificationType, + NotificationReferenceBindingProvider referenceBindingProvider, + URL remoteNtmUrl); + + void replaceConsumersBrokerConnection(URI notificationType, EndpointReference chosenBrokerProducerEpr); +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java new file mode 100644 index 0000000000..b594fde29f --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +import java.io.OutputStream; +import java.util.HashMap; + +import org.apache.axiom.om.OMElement; +import org.apache.tuscany.sca.binding.notification.NotificationReferenceBindingProvider.SubscriberInfo; +import org.apache.tuscany.sca.binding.notification.encoding.Constants; +import org.apache.tuscany.sca.binding.notification.util.IOUtils; +import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException; +import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable; +import org.apache.tuscany.sca.implementation.notification.ImmutableMessage; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; + +/** + * Turns invoke into remote message fan-out + * + * @version $Rev$ $Date$ + */ +public class NotificationReferenceBindingInvoker implements Invoker { + + private static final Message RESPONSE = new ImmutableMessage(); + private Operation operation; + + private NotificationReferenceBindingProvider notificationReferenceBindingProvider; + + public NotificationReferenceBindingInvoker(Operation operation, + NotificationReferenceBindingProvider notificationReferenceBindingProvider) { + this.operation = operation; + this.notificationReferenceBindingProvider = notificationReferenceBindingProvider; + } + + public Message invoke(Message msg) { + Object payload = msg.getBody(); + if (payload == null) { + throw new RuntimeException("Message body is null"); + } + Writeable writeable = null; + String incomingBrokerID = null; + if (payload.getClass().isArray()) { + Object[] bodyArray = (Object[])payload; + if (bodyArray.length == 3) { + writeable = getWriteableFromByteArray((byte[])bodyArray[1]); + incomingBrokerID = (String)bodyArray[2]; + } + else if (bodyArray.length == 1) { + writeable = getWriteableFromPayload(bodyArray[0]); + } + else { + throw new RuntimeException("Invalid body array size"); + } + } + else { + writeable = getWriteableFromPayload(payload); + } + + try { + for (SubscriberInfo subscriber : notificationReferenceBindingProvider.getSubscribers()) { + // check for each subscriber's broker id and skip if equal + if (incomingBrokerID != null && subscriber.brokerID != null && incomingBrokerID.equals(subscriber.brokerID)) { + continue; + } + HashMap<String, String> headers = new HashMap<String, String>(); + headers.put(IOUtils.Notification_Operation, operation.getName()); + String brokerID = notificationReferenceBindingProvider.getBrokerID(); + if (brokerID != null) { + headers.put(Constants.Broker_ID, brokerID); + } + IOUtils.sendHttpRequest(subscriber.address, headers, writeable, null); + } + } catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException("Sender caught exception", e); + } + return RESPONSE; + } + + private Writeable getWriteableFromPayload(Object payload) throws RuntimeException { + if (!(payload instanceof OMElement)) { + throw new RuntimeException("payload not OMElement"); + } + final OMElement element = (OMElement)payload; + Writeable writeable = new Writeable() { + public void write(OutputStream os) throws IOUtilsException { + try { + element.serialize(os); + os.flush(); + } + catch(Exception e) { + throw new IOUtilsException(e); + } + } + }; + return writeable; + } + + private Writeable getWriteableFromByteArray(final byte[] payload) { + Writeable writeable = new Writeable() { + public void write(OutputStream os) throws IOUtilsException { + try { + os.write(payload); + os.flush(); + } + catch(Exception e) { + throw new IOUtilsException(e); + } + } + }; + return writeable; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java new file mode 100644 index 0000000000..59ab37b2eb --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; + +import org.apache.axiom.om.OMElement; +import org.apache.tuscany.sca.binding.notification.encoding.Broker; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerID; +import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverride; +import org.apache.tuscany.sca.binding.notification.encoding.Constants; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingObject; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils; +import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference; +import org.apache.tuscany.sca.binding.notification.encoding.ReplaceBrokerConnection; +import org.apache.tuscany.sca.binding.notification.encoding.Subscribe; +import org.apache.tuscany.sca.binding.notification.util.NotificationServlet; +import org.apache.tuscany.sca.binding.notification.util.URIUtil; +import org.apache.tuscany.sca.binding.notification.util.NotificationServlet.NotificationServletStreamHandler; +import org.apache.tuscany.sca.host.http.ServletHost; +import org.apache.tuscany.sca.interfacedef.Interface; +import org.apache.tuscany.sca.interfacedef.InterfaceContract; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.provider.ReferenceBindingProvider; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; + +/** + * The runtime representaion of the notification reference binding + * + * @version $Rev$ $Date$ + */ +public class NotificationReferenceBindingProvider + implements ReferenceBindingProvider, NotificationServletStreamHandler { + + private static final String producerPathBase = "/producer"; + private NotificationReferenceBindingInvoker invoker; + private RuntimeComponentReference reference; + private NotificationBinding notificationBinding; + private ServletHost servletHost; + private NotificationTypeManager ntm; + private EncodingRegistry encodingRegistry; + private URI notificationType; + private URL myUrl; + private URL remoteNtmUrl; + private boolean started; + private NotificationBrokerManager brokerManager; + + private List<SubscriberInfo> subscribers; + private String brokerID; + + public NotificationReferenceBindingProvider(NotificationBinding notificationBinding, + RuntimeComponent component, + RuntimeComponentReference reference, + ServletHost servletHost, + NotificationTypeManager ntm, + EncodingRegistry encodingRegistry, + String httpUrl, + NotificationBrokerManager brokerManager) { + this.invoker = null; + this.notificationBinding = notificationBinding; + this.reference = reference; + this.servletHost = servletHost; + this.ntm = ntm; + this.encodingRegistry = encodingRegistry; + this.notificationType = notificationBinding.getNotificationType(); + String ntmAddress = notificationBinding.getNtmAddress(); + String notificationTypePath = URIUtil.getPath(notificationType); + try { + this.myUrl = new URL(httpUrl + producerPathBase + notificationTypePath); + this.remoteNtmUrl = null; + if (ntmAddress != null && notificationType != null) { + remoteNtmUrl = new URL(ntmAddress + notificationTypePath); + } + } + catch(Exception e) { + throw new RuntimeException(e); + } + this.started = false; + this.brokerManager = brokerManager; + + URI uri = URI.create(component.getURI() + "/" + notificationBinding.getName()); + notificationBinding.setURI(uri.toString()); + Interface interfaze = reference.getInterfaceContract().getInterface(); + interfaze.resetDataBinding(OMElement.class.getName()); + for (Operation operation : interfaze.getOperations()) { + operation.setNonBlocking(false); + } + + this.subscribers = new ArrayList<SubscriberInfo>(); + this.brokerID = null; + } + + public NotificationBinding getBinding() { + return notificationBinding; + } + + public URL getURL() { + return myUrl; + } + + public boolean isStarted() { + return started; + } + + public void setBrokerID(String brokerID) { + this.brokerID = brokerID; + } + + public String getBrokerID() { + return brokerID; + } + + public Invoker createInvoker(Operation operation) { + if (invoker == null) { + invoker = new NotificationReferenceBindingInvoker(operation, this); + } + return invoker; + } + + public boolean supportsOneWayInvocation() { + return false; + } + + public InterfaceContract getBindingInterfaceContract() { + return reference.getInterfaceContract(); + } + + public void start() { + if (started) { + return; + } + + brokerManager.referenceProviderStarted(notificationType, this, remoteNtmUrl); + started = true; + } + + public void stop() { + } + + public void deployProducer() { + List<URL> consumerList = new ArrayList<URL>(); + String sequenceType; + try { + sequenceType = ntm.newProducer(notificationType, myUrl, remoteNtmUrl, consumerList); + } catch(Exception e) { + throw new RuntimeException(e); + } + if (Constants.EndConsumers.equals(sequenceType)) { + for (URL consumerUrl : consumerList) { + addSubscriberUrl(consumerUrl); + } + } + else if (Constants.BrokerConsumers.equals(sequenceType)) { + // Pick a broker consumer, for now the first one + URL consumerUrl = consumerList.get(0); + addSubscriberUrl(consumerUrl); + } + + servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this)); + } + + public void deployBroker(String brokerID, EndpointReference brokerConsumerEPR, List<EndpointReference> consumerList) { + if (brokerConsumerEPR != null) { + addSubscriber(brokerConsumerEPR); + } + if (consumerList != null && !consumerList.isEmpty()) { + for (EndpointReference consumerEPR : consumerList) { + addSubscriber(consumerEPR); + } + } + setBrokerID(brokerID); + servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this)); + } + + public void undeployBroker(URL brokerConsumerUrl) { + EndpointReference brokerConsumerEpr = EncodingUtils.createEndpointReference(brokerConsumerUrl, getBrokerID()); + ntm.removeBroker(brokerConsumerEpr, getNeighborBrokerConsumerEprs(), remoteNtmUrl); + removeBrokerSubscribers(); + } + + public void handle(Map<String, String> headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream) { + + try { + EncodingObject eo = EncodingUtils.decodeFromStream(encodingRegistry, istream); + if (eo instanceof Subscribe) { + Subscribe sub = (Subscribe)eo; + addSubscriber(sub.getConsumerReference().getReference()); + } + else if (eo instanceof ConnectionOverride) { + ConnectionOverride co = (ConnectionOverride)eo; + replaceSubscribers(co.getBrokerConsumerReference().getReference()); + } + else if (eo instanceof ReplaceBrokerConnection) { + ReplaceBrokerConnection rbc = (ReplaceBrokerConnection)eo; + URL removedBrokerConsumerEpr = rbc.getRemovedBroker().getReference().getEndpointAddress().getAddress(); + if (rbc.getNeighbors() != null) { + int choice = rbc.getNeighbors().getBrokerSequence().size() - 1; + Broker chosenBroker = rbc.getNeighbors().getBrokerSequence().get(choice); + replaceBrokerSubscriber(removedBrokerConsumerEpr, + chosenBroker.getBrokerConsumerReference().getReference()); + brokerManager.replaceConsumersBrokerConnection(notificationType, + chosenBroker.getBrokerProducerReference().getReference()); + } + else { + replaceBrokerSubscriber(removedBrokerConsumerEpr, null); + } + } + else { + throw new RuntimeException("Unknown encoding object"); + } + } catch(Throwable e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public synchronized List<SubscriberInfo> getSubscribers() { + return subscribers; + } + + private void addSubscriberUrl(URL subscriberUrl) { + addSubscriber(subscriberUrl, null); + } + + private void addSubscriber(EndpointReference subscriberEPR) { + BrokerID brokerID = null; + if (subscriberEPR.getReferenceProperties() != null) { + brokerID = subscriberEPR.getReferenceProperties().getProperty(BrokerID.class); + } + addSubscriber(subscriberEPR.getEndpointAddress().getAddress(), (brokerID != null ? brokerID.getID() : null)); + } + + private void addSubscriber(URL address, String brokerID) { + synchronized(this) { + SubscriberInfo si = new SubscriberInfo(address); + si.brokerID = brokerID; + if (subscribers == null) { + subscribers = new ArrayList<SubscriberInfo>(); + } + subscribers.add(si); + } + } + + private void replaceSubscribers(EndpointReference brokerConsumerEPR) { + synchronized(this) { + subscribers = null; + } + addSubscriber(brokerConsumerEPR); + } + + private void replaceBrokerSubscriber(URL removedBrokerConsumerUrl, EndpointReference chosenBrokerConsumerEpr) { + synchronized(this) { + if (subscribers == null) { + throw new RuntimeException("No subscribers"); + } + SubscriberInfo siToRemove = null; + for (SubscriberInfo si : subscribers) { + if (si.address.equals(removedBrokerConsumerUrl)) { + siToRemove = si; + } + } + if (siToRemove == null) { + throw new RuntimeException("Can't find info for broker to remove [" + removedBrokerConsumerUrl + "]"); + } + if (!subscribers.remove(siToRemove)) { + throw new RuntimeException("Can't remove info for [" + siToRemove.address + "]"); + } + } + if (chosenBrokerConsumerEpr != null) { + addSubscriber(chosenBrokerConsumerEpr); + } + } + + private List<EndpointReference> getNeighborBrokerConsumerEprs() { + synchronized(this) { + if (subscribers == null) { + throw new RuntimeException("No subscribers"); + } + List<EndpointReference> neighborBrokerConsumerEprs = new ArrayList<EndpointReference>(); + for(SubscriberInfo si : subscribers) { + if (si.brokerID != null) { + neighborBrokerConsumerEprs.add(EncodingUtils.createEndpointReference(si.address, si.brokerID)); + } + } + + return neighborBrokerConsumerEprs; + } + } + + private void removeBrokerSubscribers() { + synchronized(this) { + if (subscribers == null) { + throw new RuntimeException("No subscribers"); + } + List<SubscriberInfo> sisToRemove = new ArrayList<SubscriberInfo>(); + for (SubscriberInfo si : subscribers) { + if (si.brokerID != null) { + sisToRemove.add(si); + } + } + for(SubscriberInfo si : sisToRemove) { + if (!subscribers.remove(si)) { + throw new RuntimeException("Can't remove broker subscriber [" + si.address + "]"); + } + } + } + } + + class SubscriberInfo { + public URL address; + public String brokerID; + + public SubscriberInfo(URL address) { + this.address = address; + this.brokerID = null; + } + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java new file mode 100644 index 0000000000..7748fdffe9 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; + +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.impl.builder.StAXOMBuilder; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerConsumerReference; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerID; +import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverride; +import org.apache.tuscany.sca.binding.notification.encoding.Constants; +import org.apache.tuscany.sca.binding.notification.encoding.ConsumerReference; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils; +import org.apache.tuscany.sca.binding.notification.encoding.EndpointAddress; +import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference; +import org.apache.tuscany.sca.binding.notification.encoding.ReferenceProperties; +import org.apache.tuscany.sca.binding.notification.encoding.Subscribe; +import org.apache.tuscany.sca.binding.notification.util.IOUtils; +import org.apache.tuscany.sca.binding.notification.util.NotificationServlet; +import org.apache.tuscany.sca.binding.notification.util.URIUtil; +import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException; +import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable; +import org.apache.tuscany.sca.binding.notification.util.NotificationServlet.NotificationServletStreamHandler; +import org.apache.tuscany.sca.core.invocation.MessageImpl; +import org.apache.tuscany.sca.host.http.ServletHost; +import org.apache.tuscany.sca.interfacedef.Interface; +import org.apache.tuscany.sca.interfacedef.InterfaceContract; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.provider.ServiceBindingProvider; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentService; +import org.apache.tuscany.sca.runtime.RuntimeWire; + +/** + * The runtime representaion of the local service binding + * + * @version $Rev$ $Date$ + */ +public class NotificationServiceBindingProvider + implements ServiceBindingProvider, NotificationServletStreamHandler { + + private RuntimeWire wire; + private NotificationBinding notificationBinding; + private RuntimeComponentService service; + private ServletHost servletHost; + private NotificationTypeManager ntm; + private EncodingRegistry encodingRegistry; + private URI notificationType; + private URL myUrl; + private URL remoteNtmUrl; + private static final String consumerPathBase = "/consumer"; + private boolean started; + private NotificationBrokerManager brokerManager; + private String brokerID; + + public NotificationServiceBindingProvider(NotificationBinding notificationBinding, + RuntimeComponent component, + RuntimeComponentService service, + ServletHost servletHost, + NotificationTypeManager ntm, + EncodingRegistry encodingRegistry, + String httpUrl, + NotificationBrokerManager brokerManager) { + this.notificationBinding = notificationBinding; + this.service = service; + this.servletHost = servletHost; + this.ntm = ntm; + this.encodingRegistry = encodingRegistry; + this.notificationType = notificationBinding.getNotificationType(); + String ntmAddress = notificationBinding.getNtmAddress(); + String notificationTypePath = URIUtil.getPath(notificationType); + try { + this.myUrl = new URL(httpUrl + consumerPathBase + notificationTypePath); + remoteNtmUrl = null; + if (ntmAddress != null && notificationType != null) { + remoteNtmUrl = new URL(ntmAddress + notificationTypePath); + } + } + catch(Exception e) { + throw new RuntimeException(e); + } + this.started = false; + this.brokerManager = brokerManager; + this.brokerID = null; + + URI uri = URI.create(component.getURI() + "/" + notificationBinding.getName()); + notificationBinding.setURI(uri.toString()); + Interface interfaze = service.getInterfaceContract().getInterface(); + interfaze.resetDataBinding(OMElement.class.getName()); + for (Operation operation : interfaze.getOperations()) { + operation.setNonBlocking(false); + } + } + + public NotificationBinding getBinding() { + return notificationBinding; + } + + public boolean isStarted() { + return started; + } + + public URL getURL() { + return myUrl; + } + + public InterfaceContract getBindingInterfaceContract() { + return service.getInterfaceContract(); + } + + public boolean supportsOneWayInvocation() { + return false; + } + + public void start() { + if (started) { + return; + } + + RuntimeComponentService componentService = (RuntimeComponentService) service; + wire = componentService.getRuntimeWire(notificationBinding); + + for (InvocationChain ch : wire.getInvocationChains()) { + ch.setAllowsPassByReference(true); + } + + brokerManager.serviceProviderStarted(notificationType, this, remoteNtmUrl); + started = true; + } + + public void stop() { + } + + public void deployConsumer() { + WriteableSubscribe ws = new WriteableSubscribe(myUrl, null); + List<URL> producerList = new ArrayList<URL>(); + String sequenceType = ntm.newConsumer(notificationType, myUrl, remoteNtmUrl, producerList); + if (Constants.EndProducers.equals(sequenceType)) { + for (URL producerUrl : producerList) { + subscribeWithProducer(producerUrl, null, ws); + } + } + else if (Constants.BrokerProducers.equals(sequenceType)) { + // Pick a broker producer, for now the first one + URL producerUrl = producerList.get(0); + subscribeWithProducer(producerUrl, null, ws); + } + + servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this)); + } + + protected void subscribeWithProducer(URL producerUrl, String brokerID, WriteableSubscribe ws) { + if (ws == null) { + ws = new WriteableSubscribe(myUrl, brokerID); + } + try { + IOUtils.sendHttpRequest(producerUrl, Constants.SUBSCRIBE_OP, ws, null); + } catch(Exception e) { + throw new RuntimeException(e); + } + } + + public void deployBroker(String brokerID, EndpointReference brokerProducerEPR, List<EndpointReference> producerList) { + if (brokerProducerEPR != null) { + subscribeWithProducer(brokerProducerEPR.getEndpointAddress().getAddress(), brokerID, null); + } + this.brokerID = brokerID; + if (producerList != null && !producerList.isEmpty()) { + WriteableConnectionOverride wco = new WriteableConnectionOverride(myUrl, brokerID); + for (EndpointReference producerEPR : producerList) { + try { + IOUtils.sendHttpRequest(producerEPR.getEndpointAddress().getAddress(), Constants.CONNECTION_OVERRIDE_OP, wco, null); + } catch(Exception e) { + throw new RuntimeException(e); + } + } + } + servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this)); + } + + public void replaceBrokerConnection(EndpointReference chosenBrokerProducerEpr) { + if (brokerID == null) { + throw new RuntimeException("Missing broker id"); + } + URL producerUrl = chosenBrokerProducerEpr.getEndpointAddress().getAddress(); + subscribeWithProducer(producerUrl, brokerID, null); + } + + public void handle(Map<String, String> headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream) { + String opHeader = headers.get(IOUtils.Notification_Operation); + String incomingBrokerID = headers.get(Constants.Broker_ID); + if (opHeader == null) { + throw new RuntimeException("Missing operation header"); + } + if (wire == null) { + throw new RuntimeException("Missing wire"); + } + InvocationChain chain = null; + for (InvocationChain ch : wire.getInvocationChains()) { + // We may want to use more than just the op name + if(ch.getTargetOperation().getName().equals(opHeader)) { + chain = ch; + break; + } + } + if (chain == null) { + throw new RuntimeException("Can't find invocation chain match for [" + opHeader + "]"); + } + byte[] payload = null; + try { + payload = IOUtils.readFully(istream, contentLength); + } + catch(IOException e) { + throw new RuntimeException(e); + } + Object[] args = getArgsFromByteArray(payload, incomingBrokerID); + + invoke(chain, args); + + // Doing nothing to ostream is equivalent to returning null + } + + private Object[] getArgsFromByteArray(byte[] payload, String incomingBrokerID) { + try { + StAXOMBuilder builder = new StAXOMBuilder(new ByteArrayInputStream(payload)); + OMElement element = builder.getDocumentElement(); + return new Object[] { element }; + } + catch(Exception e) { + throw new RuntimeException(e); + } + } + + protected void invoke(InvocationChain chain, Object[] args) { + Message msg = new MessageImpl(); + msg.setBody(args); + chain.getHeadInvoker().invoke(msg); + } + + class WriteableSubscribe implements Writeable { + + private Subscribe sub; + + public WriteableSubscribe(URL url, String brokerID) { + EndpointAddress epa = new EndpointAddress(); + epa.setAddress(url); + EndpointReference epr = new EndpointReference(); + epr.setEndpointAddress(epa); + if (brokerID != null) { + BrokerID cbi = new BrokerID(); + cbi.setID(brokerID); + ReferenceProperties crp = new ReferenceProperties(); + crp.addProperty(cbi); + epr.setReferenceProperties(crp); + } + ConsumerReference cr = new ConsumerReference(); + cr.setReference(epr); + sub = new Subscribe(); + sub.setConsumerReference(cr); + } + + public void write(OutputStream os) throws IOUtilsException { + EncodingUtils.encodeToStream(encodingRegistry, sub, os); + } + } + + class WriteableConnectionOverride implements Writeable { + + private ConnectionOverride connectionOverride; + + public WriteableConnectionOverride(URL brokerConsumerUrl, String brokerID) { + EndpointAddress epa = new EndpointAddress(); + epa.setAddress(brokerConsumerUrl); + EndpointReference brokerConsumerEPR = new EndpointReference(); + brokerConsumerEPR.setEndpointAddress(epa); + BrokerID cbi = new BrokerID(); + cbi.setID(brokerID); + ReferenceProperties crp = new ReferenceProperties(); + crp.addProperty(cbi); + brokerConsumerEPR.setReferenceProperties(crp); + BrokerConsumerReference brokerConsumerReference = new BrokerConsumerReference(); + brokerConsumerReference.setReference(brokerConsumerEPR); + connectionOverride = new ConnectionOverride(); + connectionOverride.setBrokerConsumerReference(brokerConsumerReference); + } + + public void write(OutputStream os) throws IOUtilsException { + EncodingUtils.encodeToStream(encodingRegistry, connectionOverride, os); + } + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManager.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManager.java new file mode 100644 index 0000000000..81f87dc4da --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManager.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +import java.net.URI; +import java.net.URL; +import java.util.List; + +import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference; + +/** + * @version $Rev$ $Date$ + */ +public interface NotificationTypeManager { + + String newConsumer(URI notificationType, URL consumerUrl, URL remoteNtmUrl, List<URL> producerList); + String newProducer(URI notificationType, URL producerUrl, URL remoteNtmUrl, List<URL> consumerList); + boolean newBroker(URI notificationType, + URL consumerUrl, + URL producerUrl, + String brokerID, + URL remoteNtmUrl, + List<EndpointReference> consumerList, + List<EndpointReference> producerList); + void newBrokerAck(URL remoteNtmUrl); + void removeBroker(EndpointReference brokerConsumerEpr, List<EndpointReference> neighborBrokerConsumerEprs, URL remoteNtmUrl); +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java new file mode 100644 index 0000000000..6e61f82042 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java @@ -0,0 +1,692 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; + +import org.apache.tuscany.sca.binding.notification.encoding.Broker; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerConsumerReference; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerID; +import org.apache.tuscany.sca.binding.notification.encoding.BrokerProducerReference; +import org.apache.tuscany.sca.binding.notification.encoding.Brokers; +import org.apache.tuscany.sca.binding.notification.encoding.Constants; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingException; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingObject; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry; +import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils; +import org.apache.tuscany.sca.binding.notification.encoding.EndConsumers; +import org.apache.tuscany.sca.binding.notification.encoding.EndProducers; +import org.apache.tuscany.sca.binding.notification.encoding.EndpointAddress; +import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference; +import org.apache.tuscany.sca.binding.notification.encoding.EndpointReferenceWrapper; +import org.apache.tuscany.sca.binding.notification.encoding.NeighborBrokerConsumers; +import org.apache.tuscany.sca.binding.notification.encoding.Neighbors; +import org.apache.tuscany.sca.binding.notification.encoding.NewBroker; +import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerAck; +import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerResponse; +import org.apache.tuscany.sca.binding.notification.encoding.NewConsumer; +import org.apache.tuscany.sca.binding.notification.encoding.NewConsumerResponse; +import org.apache.tuscany.sca.binding.notification.encoding.NewProducer; +import org.apache.tuscany.sca.binding.notification.encoding.NewProducerResponse; +import org.apache.tuscany.sca.binding.notification.encoding.RemoveBroker; +import org.apache.tuscany.sca.binding.notification.encoding.RemovedBroker; +import org.apache.tuscany.sca.binding.notification.encoding.ReplaceBrokerConnection; +import org.apache.tuscany.sca.binding.notification.util.IOUtils; +import org.apache.tuscany.sca.binding.notification.util.NotificationServlet; +import org.apache.tuscany.sca.binding.notification.util.URIUtil; +import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException; +import org.apache.tuscany.sca.binding.notification.util.IOUtils.ReadableContinuation; +import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable; +import org.apache.tuscany.sca.binding.notification.util.NotificationServlet.NotificationServletStreamHandler; +import org.apache.tuscany.sca.host.http.ServletHost; + +/** + * A notification type manager serves as a registry of producers and consumers, or brokers, for + * any notification type. This class implements an interface that allows a reference provider + * (a producer), a service provider (a consumer), or both (a broker, via the provider factory), + * to access locally the ntm for its notification type, regardless of whether the ntm resides + * locally or remotely. + * At a given host there is only one reference provider and/or one service provider for any given + * notification type. So, if the ntm for a notification type resides locally, then it is invoked + * exclusively by either a reference provider (newProducer), a service provider (newConsumer), or + * a provider factory (newBroker). And since these invocations occur when the providers are being + * created then all three of consumerLists, producerLists and brokerLists must be null when these + * invocations occur. + * + * @version $Rev$ $Date$ + */ +public class NotificationTypeManagerImpl implements NotificationTypeManager { + + private static final String ntmPathBase = "/ntm"; + + private ServletHost servletHost; + private EncodingRegistry encodingRegistry; + private Map<URI, NotificationTypeManagerHandler> ntmHandlers; + + public NotificationTypeManagerImpl() { + } + + public void setServletHost(ServletHost servletHost) { + this.servletHost = servletHost; + } + + public void setEncodingRegistry(EncodingRegistry encodingRegistry) { + this.encodingRegistry = encodingRegistry; + } + + public void init() { + ntmHandlers = new HashMap<URI, NotificationTypeManagerHandler>(); + } + + public String newConsumer(URI notificationType, URL consumerUrl, URL remoteNtmUrl, List<URL> producerListResult) { + if (ntmUrlIsRemote(consumerUrl, remoteNtmUrl)) { + try { + WriteableEPW wEPW = new WriteableEPW(new NewConsumer(), consumerUrl); + InputStreamDecoder isd = new InputStreamDecoder(); + NewConsumerResponse ncr = + (NewConsumerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_CONSUMER_OP, wEPW, isd); + String sequenceType = ncr.getSequenceType(); + if (Constants.EndProducers.equals(sequenceType) || Constants.BrokerProducers.equals(sequenceType)) { + for (EndpointReference epr : ncr.getReferenceSequence()) { + producerListResult.add(epr.getEndpointAddress().getAddress()); + } + } + return sequenceType; + } catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + else { + NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType); + if (ntmHandler != null) { + throw new RuntimeException("Trying to deploy local consumer with existing local producer, consumer or broker"); + } + + createNtmHandler(consumerUrl.getAuthority(), notificationType, consumerUrl, null, null); + + return Constants.NoProducers; + } + } + + private void createNtmHandler(String ntmUriAuthority, URI notificationType, URL consumerUrl, URL producerUrl, BrokerStruct broker) { + String ntmUri = "http://" + ntmUriAuthority + ntmPathBase + URIUtil.getPath(notificationType); + NotificationTypeManagerHandler ntmh = new NotificationTypeManagerHandler(notificationType, consumerUrl, producerUrl, broker); + ntmHandlers.put(notificationType, ntmh); + servletHost.addServletMapping(ntmUri, new NotificationServlet(ntmh)); + } + + public String newProducer(URI notificationType, URL producerUrl, URL remoteNtmUrl, List<URL> consumerListResult) { + if (ntmUrlIsRemote(producerUrl, remoteNtmUrl)) { + try { + WriteableEPW wEPW = new WriteableEPW(new NewProducer(), producerUrl); + InputStreamDecoder isd = new InputStreamDecoder(); + NewProducerResponse npr = + (NewProducerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_PRODUCER_OP, wEPW, isd); + String sequenceType = npr.getSequenceType(); + if (Constants.EndConsumers.equals(sequenceType) || Constants.BrokerConsumers.equals(sequenceType)) { + for (EndpointReference epr : npr.getReferenceSequence()) { + consumerListResult.add(epr.getEndpointAddress().getAddress()); + } + } + return sequenceType; + } catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + else { + NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType); + if (ntmHandler != null) { + throw new RuntimeException("Trying to deploy local producer with existing local producer, consumer or broker"); + } + + createNtmHandler(producerUrl.getAuthority(), notificationType, null, producerUrl, null); + + return Constants.NoConsumers; + } + } + + public boolean newBroker(URI notificationType, + URL consumerUrl, + URL producerUrl, + String brokerID, + URL remoteNtmUrl, + List<EndpointReference> consumerListResult, + List<EndpointReference> producerListResult) { + String ntmUriAuthority = producerUrl.getAuthority(); + if (!ntmUriAuthority.equals(consumerUrl.getAuthority())) { + throw new RuntimeException("Producer url and consumer url do not match"); + } + if (ntmUrlIsRemote(producerUrl, remoteNtmUrl)) { + try { + WriteableNewBroker wnb = new WriteableNewBroker(consumerUrl, producerUrl, brokerID); + InputStreamDecoder isd = new InputStreamDecoder(); + NewBrokerResponse nbr = + (NewBrokerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_BROKER_OP, wnb, isd); + if (nbr.isFirstBroker()) { + if (nbr.getEndConsumers().getSequenceType().equals(Constants.EndConsumers)) { + for (EndpointReference epr : nbr.getEndConsumers().getReferenceSequence()) { + consumerListResult.add(epr); + } + } + if (nbr.getEndProducers().getSequenceType().equals(Constants.EndProducers)) { + for (EndpointReference epr : nbr.getEndProducers().getReferenceSequence()) { + producerListResult.add(epr); + } + } + } + else { + for (Broker broker : nbr.getBrokers().getBrokerSequence()) { + consumerListResult.add(broker.getBrokerConsumerReference().getReference()); + producerListResult.add(broker.getBrokerProducerReference().getReference()); + } + } + return nbr.isFirstBroker(); + } catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + else { + NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType); + if (ntmHandler != null) { + throw new RuntimeException("Trying to deploy local broker with existing local producer, consumer or broker"); + } + + BrokerStruct broker = new BrokerStruct(consumerUrl, producerUrl, brokerID); + createNtmHandler(ntmUriAuthority, notificationType, null, null, broker); + + return true; + } + } + + private boolean ntmUrlIsRemote(URL localUrl, URL ntmUrl) { + if (ntmUrl == null) { + return false; + } + if (localUrl.getPort() != ntmUrl.getPort()) { + return true; + } + String remoteNtmUrlAuthority = ntmUrl.getAuthority(); + if (remoteNtmUrlAuthority.indexOf("localhost") >= 0) { + return false; + } + return !localUrl.getAuthority().equals(remoteNtmUrlAuthority); + } + + public void newBrokerAck(URL remoteNtmUrl) { + try { + IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_BROKER_ACK_OP, new WriteableNewBrokerAck(), null); + } catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public void removeBroker(EndpointReference brokerConsumerEpr, List<EndpointReference> neighborBrokerConsumerEprs, URL remoteNtmUrl) { + WriteableRemoveBroker wrb = new WriteableRemoveBroker(brokerConsumerEpr, neighborBrokerConsumerEprs); + + try { + IOUtils.sendHttpRequest(remoteNtmUrl, Constants.REMOVE_BROKER_OP, wrb, null); + } catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + private class NotificationTypeManagerHandler implements NotificationServletStreamHandler { + + private URI notificationType; + List<URL> consumerList; + List<URL> producerList; + List<BrokerStruct> brokerList; + private NotificationTypeLock notificationTypeLock; + private BrokerStruct pendingBroker; + + public NotificationTypeManagerHandler(URI notificationType) { + this.notificationType = notificationType; + this.notificationTypeLock = new NotificationTypeLock(); + this.pendingBroker = null; + } + + public NotificationTypeManagerHandler(URI notificationType, URL consumerUrl, URL producerUrl, BrokerStruct broker) { + this(notificationType); + if (consumerUrl != null) { + addConsumer(consumerUrl); + } + else if (producerUrl != null) { + addProducer(producerUrl); + } + else if (broker != null) { + addBroker(broker); + } + } + + private void addConsumer(URL consumerUrl) { + if (consumerList == null) { + consumerList = new ArrayList<URL>(); + } + consumerList.add(consumerUrl); + } + + private void addProducer(URL producerUrl) { + if (producerList == null) { + producerList = new ArrayList<URL>(); + } + producerList.add(producerUrl); + } + + private void addBroker(BrokerStruct broker) { + if (brokerList == null) { + brokerList = new ArrayList<BrokerStruct>(); + } + brokerList.add(broker); + } + + public void handle(Map<String, String> headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream) { + String opHeader = headers.get(IOUtils.Notification_Operation); + EncodingObject eo = null; + try { + eo = EncodingUtils.decodeFromStream(encodingRegistry, istream); + } + catch(EncodingException e) { + throw new RuntimeException(e); + } + + if (Constants.NEW_CONSUMER_OP.equals(opHeader)) { + handleNewConsumer((NewConsumer)eo, ostream); + } + else if(Constants.NEW_PRODUCER_OP.equals(opHeader)) { + handleNewProducer((NewProducer)eo, ostream); + } + else if(Constants.NEW_BROKER_OP.equals(opHeader)) { + handleNewBroker((NewBroker)eo, ostream); + } + else if (Constants.NEW_BROKER_ACK_OP.equals(opHeader)) { + handleNewBrokerAck(); + } + else if (Constants.REMOVE_BROKER_OP.equals(opHeader)) { + handleRemoveBroker((RemoveBroker)eo); + } + } + + private void handleNewConsumer(NewConsumer nc, ServletOutputStream ostream) { + synchronized(notificationTypeLock) { + if (notificationTypeLock.isLocked) { + try { notificationTypeLock.wait(); } catch(InterruptedException e) {} + } + URL consumerUrl = nc.getReference().getEndpointAddress().getAddress(); + if (brokerList == null) { + addConsumer(consumerUrl); + } + + NewConsumerResponse ncr = new NewConsumerResponse(); + if (producerList != null) { + ncr.setSequenceType(Constants.EndProducers); + for (URL producerUrl : producerList) { + ncr.addReferenceToSequence(EncodingUtils.createEndpointReference(producerUrl, null)); + } + } + else if(brokerList != null) { + ncr.setSequenceType(Constants.BrokerProducers); + for (BrokerStruct broker : brokerList) { + ncr.addReferenceToSequence(EncodingUtils.createEndpointReference(broker.producerUrl, null)); + } + } + else { + ncr.setSequenceType(Constants.NoProducers); + } + try { + EncodingUtils.encodeToStream(encodingRegistry, ncr, ostream); + } + catch(IOUtilsException e) { + throw new RuntimeException(e); + } + } + } + + private void handleNewProducer(NewProducer np, ServletOutputStream ostream) { + synchronized(notificationTypeLock) { + if (notificationTypeLock.isLocked) { + try { notificationTypeLock.wait(); } catch(InterruptedException e) {} + } + URL producerUrl = np.getReference().getEndpointAddress().getAddress(); + if (brokerList == null) { + addProducer(producerUrl); + } + + NewProducerResponse npr = new NewProducerResponse(); + if (consumerList != null) { + npr.setSequenceType(Constants.EndConsumers); + for (URL consumerUrl : consumerList) { + npr.addReferenceToSequence(EncodingUtils.createEndpointReference(consumerUrl, null)); + } + } + else if(brokerList != null) { + npr.setSequenceType(Constants.BrokerConsumers); + for (BrokerStruct broker : brokerList) { + npr.addReferenceToSequence(EncodingUtils.createEndpointReference(broker.consumerUrl, null)); + } + } + else { + npr.setSequenceType(Constants.NoConsumers); + } + try { + EncodingUtils.encodeToStream(encodingRegistry, npr, ostream); + } + catch(IOUtilsException e) { + throw new RuntimeException(e); + } + } + } + + private void handleNewBroker(NewBroker nb, ServletOutputStream ostream) { + synchronized(notificationTypeLock) { + if (notificationTypeLock.isLocked) { + try { notificationTypeLock.wait(); } catch(InterruptedException e) {} + } + NewBrokerResponse nbr = new NewBrokerResponse(); + if (consumerList != null || producerList != null || brokerList == null) { + nbr.setFirstBroker(true); + EndConsumers endConsumers = new EndConsumers(); + if (consumerList != null) { + endConsumers.setSequenceType(Constants.EndConsumers); + for (URL consumerUrl : consumerList) { + endConsumers.addReferenceToSequence(EncodingUtils.createEndpointReference(consumerUrl, null)); + } + } + else { + endConsumers.setSequenceType(Constants.NoConsumers); + } + nbr.setEndConsumers(endConsumers); + EndProducers endProducers = new EndProducers(); + if (producerList != null) { + endProducers.setSequenceType(Constants.EndProducers); + for (URL producerUrl : producerList) { + endProducers.addReferenceToSequence(EncodingUtils.createEndpointReference(producerUrl, null)); + } + } + else { + endProducers.setSequenceType(Constants.NoProducers); + } + nbr.setEndProducers(endProducers); + } + else { + nbr.setFirstBroker(false); + Brokers brokers = new Brokers(); + for (BrokerStruct brokerStruct : brokerList) { + Broker brokerElt = new Broker(); + BrokerConsumerReference bcr = new BrokerConsumerReference(); + bcr.setReference(EncodingUtils.createEndpointReference(brokerStruct.consumerUrl, brokerStruct.brokerID)); + brokerElt.setBrokerConsumerReference(bcr); + + BrokerProducerReference bpr = new BrokerProducerReference(); + bpr.setReference(EncodingUtils.createEndpointReference(brokerStruct.producerUrl, brokerStruct.brokerID)); + brokerElt.setBrokerProducerReference(bpr); + brokers.addBrokerToSequence(brokerElt); + } + nbr.setBrokers(brokers); + } + EndpointReference consumerEPR = nb.getBrokerConsumerReference().getReference(); + URL consumerUrl = consumerEPR.getEndpointAddress().getAddress(); + BrokerID consumerBrokerID = consumerEPR.getReferenceProperties().getProperty(BrokerID.class); + EndpointReference producerEPR = nb.getBrokerProducerReference().getReference(); + URL producerUrl = producerEPR.getEndpointAddress().getAddress(); + BrokerID producerBrokerID = producerEPR.getReferenceProperties().getProperty(BrokerID.class); + if (consumerBrokerID == null || + producerBrokerID == null || + !consumerBrokerID.getID().equals(producerBrokerID.getID())) { + throw new RuntimeException("Producer and consumer broker ids do not match"); + } + // only add broker if consumerList == null && producerList == null + // otherwise, make it a pending broker and wait for ack + // TODO block for a configurable amount of time + BrokerStruct broker = new BrokerStruct(consumerUrl, producerUrl, consumerBrokerID.getID()); + if (consumerList == null && producerList == null) { + addBroker(broker); + } + else { + pendingBroker = broker; + notificationTypeLock.isLocked = true; + } + try { + EncodingUtils.encodeToStream(encodingRegistry, nbr, ostream); + } + catch(IOUtilsException e) { + throw new RuntimeException(e); + } + } + } + + private void handleNewBrokerAck() { + synchronized(notificationTypeLock) { + if (!notificationTypeLock.isLocked) { + notificationTypeLock.notifyAll(); + throw new RuntimeException("Notification type should be locked"); + } + if (brokerList != null) { + notificationTypeLock.isLocked = false; + notificationTypeLock.notifyAll(); + throw new RuntimeException("Can't add pending broker to non-empty broker list"); + } + if (pendingBroker == null) { + notificationTypeLock.isLocked = false; + notificationTypeLock.notifyAll(); + throw new RuntimeException("Missing pending broker"); + } + addBroker(pendingBroker); + consumerList = null; + producerList = null; + pendingBroker = null; + notificationTypeLock.isLocked = false; + notificationTypeLock.notifyAll(); + } + } + + private void handleRemoveBroker(RemoveBroker rb) { + synchronized(notificationTypeLock) { + if (notificationTypeLock.isLocked) { + try { notificationTypeLock.wait(); } catch(InterruptedException e) {} + } + + if (brokerList == null) { + throw new RuntimeException("No broker to remove for [" + notificationType + "]"); + } + + NeighborBrokerConsumers nbcs = rb.getNeighborBrokerConsumers(); + EndpointReference rbEpr = rb.getBrokerConsumerReference().getReference(); + if (nbcs != null && nbcs.getReferenceSequence() != null) { + List<Broker> neighborBrokers = new ArrayList<Broker>(); + for (EndpointReference neighborBrokerConsumerEpr : nbcs.getReferenceSequence()) { + BrokerStruct neighborBrokerStruct = null; + URL neighborBrokerConsumerEprUrl = neighborBrokerConsumerEpr.getEndpointAddress().getAddress(); + for (BrokerStruct brokerStruct : brokerList) { + if (brokerStruct.consumerUrl.equals(neighborBrokerConsumerEprUrl)) { + neighborBrokerStruct = brokerStruct; + break; + } + } + if (neighborBrokerStruct == null) { + throw new RuntimeException("Can't find neighbor broker for consumer EPR [" + + neighborBrokerConsumerEprUrl + "]"); + } + BrokerConsumerReference bcr = new BrokerConsumerReference(); + bcr.setReference(EncodingUtils.createEndpointReference(neighborBrokerStruct.consumerUrl, neighborBrokerStruct.brokerID)); + BrokerProducerReference bpr = new BrokerProducerReference(); + bpr.setReference(EncodingUtils.createEndpointReference(neighborBrokerStruct.producerUrl, neighborBrokerStruct.brokerID)); + Broker neighborBroker = new Broker(); + neighborBroker.setBrokerConsumerReference(bcr); + neighborBroker.setBrokerProducerReference(bpr); + neighborBrokers.add(neighborBroker); + } + int lastIndex = neighborBrokers.size() - 1; + for (int index = lastIndex; index >= 0; index--) { + List<Broker> writeableNeighborBrokers = ((index > 0) ? neighborBrokers.subList(0, index) : null); + WriteableReplaceBrokerConnection wrbc = new WriteableReplaceBrokerConnection(rbEpr, writeableNeighborBrokers); + URL targetUrl = + neighborBrokers.get(index).getBrokerProducerReference().getReference().getEndpointAddress().getAddress(); + try { + IOUtils.sendHttpRequest(targetUrl, Constants.REPLACE_BROKER_CONNECTION_OP, wrbc, null); + } catch(Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + + BrokerStruct removedBrokerStruct = null; + URL rbEprUrl = rbEpr.getEndpointAddress().getAddress(); + for (BrokerStruct brokerSruct : brokerList) { + if (brokerSruct.consumerUrl.equals(rbEprUrl)) { + removedBrokerStruct = brokerSruct; + break; + } + } + if (removedBrokerStruct == null) { + throw new RuntimeException("Can't find broker to remove for EPR [" + rbEprUrl + "]"); + } + if(!brokerList.remove(removedBrokerStruct)) { + throw new RuntimeException("Broker was not removed"); + } + } + } + } + + class NotificationTypeLock { + public boolean isLocked; + } + + class WriteableEPW implements Writeable { + private EndpointReferenceWrapper epw; + + public WriteableEPW(EndpointReferenceWrapper epw, URL url) { + EndpointAddress epa = new EndpointAddress(); + epa.setAddress(url); + EndpointReference epr = new EndpointReference(); + epr.setEndpointAddress(epa); + epw.setReference(epr); + this.epw = epw; + } + + public void write(OutputStream os) throws IOUtilsException { + EncodingUtils.encodeToStream(encodingRegistry, epw, os); + } + } + + class InputStreamDecoder implements ReadableContinuation { + + public Object read(InputStream istream) throws IOUtilsException { + try { + return EncodingUtils.decodeFromStream(encodingRegistry, istream); + } + catch(EncodingException e) { + throw new IOUtilsException(e); + } + } + } + + class BrokerStruct { + public URL consumerUrl; + public URL producerUrl; + public String brokerID; + + public BrokerStruct(URL consumerUrl, URL producerUrl, String brokerID) { + this.consumerUrl = consumerUrl; + this.producerUrl = producerUrl; + this.brokerID = brokerID; + } + } + + class WriteableNewBroker implements Writeable { + private NewBroker newBroker; + + public WriteableNewBroker(URL consumerUrl, URL producerUrl, String brokerID) { + newBroker = new NewBroker(); + BrokerConsumerReference bcr = new BrokerConsumerReference(); + bcr.setReference(EncodingUtils.createEndpointReference(consumerUrl, brokerID)); + newBroker.setBrokerConsumerReference(bcr); + + BrokerProducerReference bpr = new BrokerProducerReference(); + bpr.setReference(EncodingUtils.createEndpointReference(producerUrl, brokerID)); + newBroker.setBrokerProducerReference(bpr); + } + + public void write(OutputStream os) throws IOUtilsException { + EncodingUtils.encodeToStream(encodingRegistry, newBroker, os); + } + } + + class WriteableNewBrokerAck implements Writeable { + public void write(OutputStream os) throws IOUtilsException { + EncodingUtils.encodeToStream(encodingRegistry, new NewBrokerAck(), os); + } + } + + class WriteableRemoveBroker implements Writeable { + private RemoveBroker removeBroker; + + public WriteableRemoveBroker(EndpointReference brokerConsumerEpr, List<EndpointReference> neighborBrokerConsumerEprs) { + removeBroker = new RemoveBroker(); + BrokerConsumerReference brokerConsumerReference = new BrokerConsumerReference(); + brokerConsumerReference.setReference(brokerConsumerEpr); + removeBroker.setBrokerConsumerReference(brokerConsumerReference); + if (neighborBrokerConsumerEprs != null) { + NeighborBrokerConsumers neighborBrokerConsumers = new NeighborBrokerConsumers(); + neighborBrokerConsumers.setReferenceSequence(neighborBrokerConsumerEprs); + neighborBrokerConsumers.setSequenceType(Constants.BrokerConsumers); + removeBroker.setNeighborBrokerConsumers(neighborBrokerConsumers); + } + } + + public void write(OutputStream os) throws IOUtilsException { + EncodingUtils.encodeToStream(encodingRegistry, removeBroker, os); + } + } + + class WriteableReplaceBrokerConnection implements Writeable { + private ReplaceBrokerConnection replaceBrokerConnection; + + public WriteableReplaceBrokerConnection(EndpointReference removedBrokerEpr, List<Broker> brokerSequence) { + replaceBrokerConnection = new ReplaceBrokerConnection(); + RemovedBroker removedBroker = new RemovedBroker(); + removedBroker.setReference(removedBrokerEpr); + replaceBrokerConnection.setRemovedBroker(removedBroker); + if (brokerSequence != null) { + Neighbors neighbors = new Neighbors(); + neighbors.setBrokerSequence(brokerSequence); + replaceBrokerConnection.setNeighbors(neighbors); + } + } + + public void write(OutputStream os) throws IOUtilsException { + EncodingUtils.encodeToStream(encodingRegistry, replaceBrokerConnection, os); + } + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBroker.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBroker.java new file mode 100644 index 0000000000..a46ae13763 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBroker.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class AbstractBroker implements EncodingObject { + + private BrokerProducerReference brokerProducerReference; + private BrokerConsumerReference brokerConsumerReference; + + public BrokerProducerReference getBrokerProducerReference() { + return this.brokerProducerReference; + } + + public void setBrokerProducerReference(BrokerProducerReference brokerProducerReference) { + this.brokerProducerReference = brokerProducerReference; + } + + public BrokerConsumerReference getBrokerConsumerReference() { + return this.brokerConsumerReference; + } + + public void setBrokerConsumerReference(BrokerConsumerReference brokerConsumerReference) { + this.brokerConsumerReference = brokerConsumerReference; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBrokerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBrokerEnDeCoder.java new file mode 100644 index 0000000000..8dc5be9b16 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBrokerEnDeCoder.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public abstract class AbstractBrokerEnDeCoder<B extends AbstractBroker> extends AbstractEnDeCoder<B> { + + public AbstractBrokerEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(B encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + QName qName = getEncodingObjectQName(); + writer.writeStartElement(Constants.NOTIFICATION_PREFIX, qName.getLocalPart(), qName.getNamespaceURI()); + writer.writeNamespace(Constants.NOTIFICATION_PREFIX, qName.getNamespaceURI()); + registry.encode(encodingObject.getBrokerConsumerReference(), writer); + registry.encode(encodingObject.getBrokerProducerReference(), writer); + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public B decode(XMLStreamReader reader) throws EncodingException { + + try { + B brokerElement = getEncodingObjectType().newInstance(); + boolean haveBCR = false; + boolean haveBPR = false; + while (true) { + switch (reader.next()) { + case START_ELEMENT: + EncodingObject encodingObject = registry.decode(reader); + if (encodingObject instanceof BrokerConsumerReference && !haveBCR) { + brokerElement.setBrokerConsumerReference((BrokerConsumerReference)encodingObject); + haveBCR = true; + } + else if(encodingObject instanceof BrokerProducerReference && !haveBPR) { + brokerElement.setBrokerProducerReference((BrokerProducerReference)encodingObject); + haveBPR = true; + } + else { + throw new EncodingException("Invalid encoding object"); + } + break; + case END_ELEMENT: + if (!haveBCR) { + throw new EncodingException("Missing broker consumer reference"); + } + if (!haveBPR) { + throw new EncodingException("Missing broker producer reference"); + } + return brokerElement; + } + } + } catch (Exception ex) { + throw new EncodingException(ex); + } + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractEnDeCoder.java new file mode 100644 index 0000000000..015ccebaa9 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractEnDeCoder.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; + +/** + * @version $Rev$ $Date$ + */ +public abstract class AbstractEnDeCoder<E extends EncodingObject> implements + EnDeCoder<E> { + + protected EncodingRegistry registry; + + protected AbstractEnDeCoder(EncodingRegistry registry) { + + this.registry = registry; + } + + public void start() { + Class<E> encodingType = getEncodingObjectType(); + QName encodingQName = getEncodingObjectQName(); + + registry.registerEnDeCoder(encodingType, encodingQName, this); + } + + public void stop() { + Class<E> encodingType = getEncodingObjectType(); + QName encodingQName = getEncodingObjectQName(); + + registry.unregisterEnDeCoder(encodingType, encodingQName); + } + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Broker.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Broker.java new file mode 100644 index 0000000000..41610edc3b --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Broker.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class Broker extends AbstractBroker { +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReference.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReference.java new file mode 100644 index 0000000000..e80af5ea68 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReference.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class BrokerConsumerReference extends EndpointReferenceWrapper { + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReferenceEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReferenceEnDeCoder.java new file mode 100644 index 0000000000..7df1c21ca9 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReferenceEnDeCoder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; + +/** + * @version $Rev$ $Date$ + */ +public class BrokerConsumerReferenceEnDeCoder extends EndpointReferenceWrapperEnDeCoder<BrokerConsumerReference> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.BrokerConsumerReference); + + public BrokerConsumerReferenceEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + + public QName getEncodingObjectQName() { + return QNAME; + } + + + public Class<BrokerConsumerReference> getEncodingObjectType() { + return BrokerConsumerReference.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerEnDeCoder.java new file mode 100644 index 0000000000..a4bee61947 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerEnDeCoder.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; + +/** + * @version $Rev$ $Date$ + */ +public class BrokerEnDeCoder extends AbstractBrokerEnDeCoder<Broker> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.Broker); + + public BrokerEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<Broker> getEncodingObjectType() { + + return Broker.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerID.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerID.java new file mode 100644 index 0000000000..aec7907441 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerID.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import java.util.UUID; + +/** + * @version $Rev$ $Date$ + */ +public class BrokerID implements EncodingObject { + + private String id; + + public String getID() { + return id; + } + + public void setID(String id) { + this.id = id; + } + + public static String generate() { + return UUID.randomUUID().toString(); + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerIDEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerIDEnDeCoder.java new file mode 100644 index 0000000000..68d9340abf --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerIDEnDeCoder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamConstants; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class BrokerIDEnDeCoder extends AbstractEnDeCoder<BrokerID> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.BrokerID); + + public BrokerIDEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(BrokerID encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI()); + writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI()); + writer.writeCharacters(encodingObject.getID()); + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public BrokerID decode(XMLStreamReader reader) throws EncodingException { + + try { + BrokerID brokerIDElement = new BrokerID(); + while (true) { + switch (reader.next()) { + case START_ELEMENT: + break; + case XMLStreamConstants.CHARACTERS: + if (reader.hasText()) { + String id = reader.getText(); + brokerIDElement.setID(id); + } + else { + throw new EncodingException("Broker ID missing value"); + } + break; + case END_ELEMENT: + return brokerIDElement; + } + } + } catch (XMLStreamException ex) { + throw new EncodingException(ex); + } + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<BrokerID> getEncodingObjectType() { + + return BrokerID.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReference.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReference.java new file mode 100644 index 0000000000..04d7aca0e6 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReference.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class BrokerProducerReference extends EndpointReferenceWrapper { + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReferenceEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReferenceEnDeCoder.java new file mode 100644 index 0000000000..ea66e8f11b --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReferenceEnDeCoder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; + +/** + * @version $Rev$ $Date$ + */ +public class BrokerProducerReferenceEnDeCoder extends EndpointReferenceWrapperEnDeCoder<BrokerProducerReference> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.BrokerProducerReference); + + public BrokerProducerReferenceEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + + public QName getEncodingObjectQName() { + return QNAME; + } + + + public Class<BrokerProducerReference> getEncodingObjectType() { + return BrokerProducerReference.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Brokers.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Brokers.java new file mode 100644 index 0000000000..9b7eb149e0 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Brokers.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import java.util.ArrayList; +import java.util.List; + +/** + * @version $Rev$ $Date$ + */ +public class Brokers implements EncodingObject { + + private List<Broker> brokerSequence; + + public List<Broker> getBrokerSequence() { + return brokerSequence; + } + + public void addBrokerToSequence(Broker broker) { + if(this.brokerSequence == null) { + this.brokerSequence = new ArrayList<Broker>(); + } + this.brokerSequence.add(broker); + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokersEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokersEnDeCoder.java new file mode 100644 index 0000000000..1c3a0aaee8 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokersEnDeCoder.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class BrokersEnDeCoder extends AbstractEnDeCoder<Brokers> { + + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.Brokers); + + public BrokersEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(Brokers encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + QName qName = getEncodingObjectQName(); + writer.writeStartElement(Constants.NOTIFICATION_PREFIX, qName.getLocalPart(), qName.getNamespaceURI()); + writer.writeNamespace(Constants.NOTIFICATION_PREFIX, qName.getNamespaceURI()); + if (encodingObject.getBrokerSequence() != null) { + for (Broker broker : encodingObject.getBrokerSequence()) { + registry.encode(broker, writer); + } + } + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public Brokers decode(XMLStreamReader reader) throws EncodingException { + + try { + Brokers brokersElement = new Brokers(); + while (true) { + switch (reader.next()) { + case START_ELEMENT: + EncodingObject encodingObject = registry.decode(reader); + brokersElement.addBrokerToSequence((Broker)encodingObject); + break; + case END_ELEMENT: + return brokersElement; + } + } + } catch (XMLStreamException ex) { + throw new EncodingException(ex); + } + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<Brokers> getEncodingObjectType() { + + return Brokers.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverride.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverride.java new file mode 100644 index 0000000000..eb1a89812d --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverride.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class ConnectionOverride implements EncodingObject { + + private BrokerConsumerReference brokerConsumerReference; + + public BrokerConsumerReference getBrokerConsumerReference() { + return this.brokerConsumerReference; + } + + public void setBrokerConsumerReference(BrokerConsumerReference brokerConsumerReference) { + this.brokerConsumerReference = brokerConsumerReference; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideEnDeCoder.java new file mode 100644 index 0000000000..af20d02c00 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideEnDeCoder.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class ConnectionOverrideEnDeCoder extends AbstractEnDeCoder<ConnectionOverride> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.ConnectionOverride); + + public ConnectionOverrideEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(ConnectionOverride encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI()); + writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI()); + registry.encode(encodingObject.getBrokerConsumerReference(), writer); + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public ConnectionOverride decode(XMLStreamReader reader) throws EncodingException { + + try { + ConnectionOverride connectionOverrideElement = new ConnectionOverride(); + while (true) { + switch (reader.next()) { + case START_ELEMENT: + EncodingObject encodingObject = registry.decode(reader); + connectionOverrideElement.setBrokerConsumerReference((BrokerConsumerReference)encodingObject); + break; + case END_ELEMENT: + return connectionOverrideElement; + } + } + } catch (Exception ex) { + throw new EncodingException(ex); + } + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<ConnectionOverride> getEncodingObjectType() { + + return ConnectionOverride.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponse.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponse.java new file mode 100644 index 0000000000..8dcf9fda58 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponse.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class ConnectionOverrideResponse implements EncodingObject { +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponseEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponseEnDeCoder.java new file mode 100644 index 0000000000..c5c8a99e5c --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponseEnDeCoder.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class ConnectionOverrideResponseEnDeCoder extends AbstractEnDeCoder<ConnectionOverrideResponse> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.ConnectionOverrideResponse); + + public ConnectionOverrideResponseEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(ConnectionOverrideResponse encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI()); + writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI()); + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public ConnectionOverrideResponse decode(XMLStreamReader reader) throws EncodingException { + + try { + ConnectionOverrideResponse connectionOverrideResponseElement = new ConnectionOverrideResponse(); + while (true) { + switch (reader.next()) { + case END_ELEMENT: + return connectionOverrideResponseElement; + } + } + } catch (Exception ex) { + throw new EncodingException(ex); + } + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<ConnectionOverrideResponse> getEncodingObjectType() { + + return ConnectionOverrideResponse.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Constants.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Constants.java new file mode 100644 index 0000000000..e92ffc1d0c --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Constants.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public interface Constants { + + String NOTIFICATION_NS = "http://docs.oasis-open.org/wsn/b-2"; + String NOTIFICATION_PREFIX = "wsnt"; + String ADDRESSING_NS = "http://schemas.xmlsoap.org/ws/2004/08/addressing"; + String ADDRESSING_PREFIX = "wsa"; + String Subscribe = "Subscribe"; + String ConsumerReference = "ConsumerReference"; + String Address = "Address"; + String ReferenceProperties = "ReferenceProperties"; + String EndpointReference = "EndpointReference"; + String BrokerID = "BrokerID"; + String NewConsumer = "NewConsumer"; + String NewProducer = "NewProducer"; + String NewConsumerResponse = "NewConsumerResponse"; + String NewProducerResponse = "NewProducerResponse"; + String ConsumerSequenceType = "ConsumerSequenceType"; + String EndConsumers = "EndConsumers"; + String BrokerConsumers = "BrokerConsumers"; + String NoConsumers = "NoConsumers"; + String ProducerSequenceType = "ProducerSequenceType"; + String EndProducers = "EndProducers"; + String BrokerProducers = "BrokerProducers"; + String NoProducers = "NoProducers"; + String Broker = "Broker"; + String NewBroker = "NewBroker"; + String NewBrokerAck = "NewBrokerAck"; + String BrokerConsumerReference = "BrokerConsumerReference"; + String BrokerProducerReference = "BrokerProducerReference"; + String NewBrokerResponse = "NewBrokerResponse"; + String FirstBroker = "FirstBroker"; + String Brokers = "Brokers"; + String ConnectionOverride = "ConnectionOverride"; + String ConnectionOverrideResponse = "ConnectionOverrideResponse"; + String NeighborBrokerConsumers = "NeighborBrokerConsumers"; + String RemoveBroker = "RemoveBroker"; + String RemovedBroker = "RemovedBroker"; + String Neighbors = "Neighbors"; + String ReplaceBrokerConnection = "ReplaceBrokerConnection"; + + String SUBSCRIBE_OP = "subscribe"; + String CONNECTION_OVERRIDE_OP = "connectionOverride"; + String NEW_CONSUMER_OP = "newConsumer"; + String NEW_PRODUCER_OP = "newProducer"; + String NEW_BROKER_OP = "newBroker"; + String NEW_BROKER_ACK_OP = "newBrokerAck"; + String REMOVE_BROKER_OP = "removeBroker"; + String REPLACE_BROKER_CONNECTION_OP = "replaceBrokerConnection"; + + String Broker_ID = "brokerID"; +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReference.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReference.java new file mode 100644 index 0000000000..c2c16ab5cd --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReference.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class ConsumerReference extends EndpointReferenceWrapper { + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReferenceEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReferenceEnDeCoder.java new file mode 100644 index 0000000000..68099b40bb --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReferenceEnDeCoder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; + +/** + * @version $Rev$ $Date$ + */ +public class ConsumerReferenceEnDeCoder extends EndpointReferenceWrapperEnDeCoder<ConsumerReference> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.ConsumerReference); + + public ConsumerReferenceEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + + public QName getEncodingObjectQName() { + return QNAME; + } + + + public Class<ConsumerReference> getEncodingObjectType() { + return ConsumerReference.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/DefaultEncodingRegistry.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/DefaultEncodingRegistry.java new file mode 100644 index 0000000000..9533ea3062 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/DefaultEncodingRegistry.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class DefaultEncodingRegistry implements EncodingRegistry { + + private final Map<Class<? extends EncodingObject>, EnDeCoder> encoderRegistry = + new ConcurrentHashMap<Class<? extends EncodingObject>, EnDeCoder>(); + + private final Map<QName, EnDeCoder> decoderRegistry = new ConcurrentHashMap<QName, EnDeCoder>(); + + public DefaultEncodingRegistry() { + } + + public <E extends EncodingObject> void registerEnDeCoder(Class<E> encodingClass, QName qname, EnDeCoder<E> enDeCoder) { + + encoderRegistry.put(encodingClass, enDeCoder); + decoderRegistry.put(qname, enDeCoder); + } + + public <E extends EncodingObject> void unregisterEnDeCoder(Class<E> encodingClass, QName qname) { + + encoderRegistry.remove(encodingClass); + decoderRegistry.remove(qname); + } + + @SuppressWarnings("unchecked") + public void encode(EncodingObject encodingObject, XMLStreamWriter writer) throws EncodingException { + + EnDeCoder encoder = encoderRegistry.get(encodingObject.getClass()); + if (encoder == null) { + throw new EncodingException("No encoder defined for " + encodingObject.getClass()); + } + encoder.encode(encodingObject, writer); + } + + public EncodingObject decode(XMLStreamReader reader) throws EncodingException { + + QName qname = reader.getName(); + + EnDeCoder decoder = decoderRegistry.get(qname); + if (decoder == null) { + throw new EncodingException("No decoder defined for " + qname); + } + return decoder.decode(reader); + } + + public void stop() { + encoderRegistry.clear(); + decoderRegistry.clear(); + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EnDeCoder.java new file mode 100644 index 0000000000..159357ed90 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EnDeCoder.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public interface EnDeCoder<E extends EncodingObject> { + + /** + * Encodes an object to the specified stream writer. + * + * @param encodingObject Object to be serialized. + * @param writer Stream writer to which the infoset is serialized. + * @throws EncodingException In case of any encoding error. + */ + void encode(E encodingObject, XMLStreamWriter writer) throws EncodingException; + + /** + * Decodes an XML stream to an object. + * + * @param reader XML stream from where the encoded XML is read. + * @return Encoding object. + * @throws EncodingException In case of any encoding error. + */ + E decode(XMLStreamReader reader) throws EncodingException; + + /** + * Gets the qualified name of the XML fragment for the Encoding + * object. + * + * @return Qualified name of the XML fragment. + */ + QName getEncodingObjectQName(); + + /** + * Returns the type of the encoding object. + * + * @return Encoding object type. + */ + Class<E> getEncodingObjectType(); +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingException.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingException.java new file mode 100644 index 0000000000..76e4999ddc --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingException.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +@SuppressWarnings("serial") +public class EncodingException extends Exception { + + /** + * + */ + private static final long serialVersionUID = 1L; + + /** + * Initializes the exception message. + * + * @param message Message for the exception. + */ + public EncodingException(String message) { + super(message); + } + + /** + * Initializes the root cause. + * + * @param cause Root cause for the exception. + */ + public EncodingException(Throwable cause) { + super(cause); + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingObject.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingObject.java new file mode 100644 index 0000000000..54bda38033 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingObject.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public interface EncodingObject { + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingRegistry.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingRegistry.java new file mode 100644 index 0000000000..c0a324d663 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingRegistry.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public interface EncodingRegistry { + + /** + * Registers an en/de coder. + * + * @param <E> Encoding object type. + * @param encodingClass Encoding obejct class. + * @param qname Qualified name of the root element of the encoded XML. + * @param enDeCoder Encoding object enDeCoder. + */ + <E extends EncodingObject> void registerEnDeCoder(Class<E> encodingClass, QName qname, EnDeCoder<E> enDeCoder); + + <E extends EncodingObject> void unregisterEnDeCoder(Class<E> encodingClass, QName qname); + + /** + * Encodes an object. + * + * @param encodingObject Encoding object to be encoded. + * @param writer Writer to which encoded information is written. + */ + void encode(EncodingObject encodingObject, XMLStreamWriter writer) throws EncodingException; + + /** + * Decodes an XML stream to an encoding object. + * + * @param reader Reader from which encoded information is read. + * @return Encoding object from the encoded stream. + */ + EncodingObject decode(XMLStreamReader reader) throws EncodingException; +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingUtils.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingUtils.java new file mode 100644 index 0000000000..8318892e0f --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; + +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException; + +/** + * @version $Rev$ $Date$ + */ +public class EncodingUtils { + + private static XMLOutputFactory xof = XMLOutputFactory.newInstance(); + private static XMLInputFactory xif = XMLInputFactory.newInstance(); + + public static void encodeToStream(EncodingRegistry encodingRegistry, + EncodingObject eo, + OutputStream os) throws IOUtilsException { + try { + XMLStreamWriter writer = xof.createXMLStreamWriter(os); + encodingRegistry.encode(eo, writer); + writer.flush(); + writer.close(); + } + catch(Exception e) { + throw new IOUtilsException(e); + } + } + + public static EncodingObject decodeFromStream(EncodingRegistry encodingRegistry, + InputStream istream) throws EncodingException { + EncodingObject eo = null; + try { + XMLStreamReader reader = xif.createXMLStreamReader(istream); + reader.next(); + eo = encodingRegistry.decode(reader); + reader.close(); + } + catch(XMLStreamException e) { + throw new EncodingException(e); + } + + return eo; + } + + public static EndpointReference createEndpointReference(URL address, String brokerID) { + EndpointAddress epa = new EndpointAddress(); + epa.setAddress(address); + EndpointReference epr = new EndpointReference(); + epr.setEndpointAddress(epa); + if (brokerID != null) { + BrokerID bi = new BrokerID(); + bi.setID(brokerID); + ReferenceProperties rp = new ReferenceProperties(); + rp.addProperty(bi); + epr.setReferenceProperties(rp); + } + return epr; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumers.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumers.java new file mode 100644 index 0000000000..aaa9ddb7b5 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumers.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class EndConsumers extends EndpointReferenceSequence { + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumersEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumersEnDeCoder.java new file mode 100644 index 0000000000..26b8fd0357 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumersEnDeCoder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class EndConsumersEnDeCoder extends EndpointReferenceSequenceEnDeCoder<EndConsumers> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.EndConsumers); + + public EndConsumersEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + + public QName getEncodingObjectQName() { + return QNAME; + } + + + public Class<EndConsumers> getEncodingObjectType() { + return EndConsumers.class; + } + + @Override + protected void encodeSequenceTypeAttribute(EndConsumers encodingObject, XMLStreamWriter writer) throws EncodingException { + try { + writer.writeAttribute(Constants.ConsumerSequenceType, encodingObject.getSequenceType()); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + @Override + protected String decodeSequenceTypeAttribute(XMLStreamReader reader) { + return reader.getAttributeValue(null, Constants.ConsumerSequenceType); + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducers.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducers.java new file mode 100644 index 0000000000..b546f8c550 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducers.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class EndProducers extends EndpointReferenceSequence { + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducersEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducersEnDeCoder.java new file mode 100644 index 0000000000..1039a3df9d --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducersEnDeCoder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class EndProducersEnDeCoder extends EndpointReferenceSequenceEnDeCoder<EndProducers> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.EndProducers); + + public EndProducersEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + + public QName getEncodingObjectQName() { + return QNAME; + } + + + public Class<EndProducers> getEncodingObjectType() { + return EndProducers.class; + } + + @Override + protected void encodeSequenceTypeAttribute(EndProducers encodingObject, XMLStreamWriter writer) throws EncodingException { + try { + writer.writeAttribute(Constants.ProducerSequenceType, encodingObject.getSequenceType()); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + @Override + protected String decodeSequenceTypeAttribute(XMLStreamReader reader) { + return reader.getAttributeValue(null, Constants.ProducerSequenceType); + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddress.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddress.java new file mode 100644 index 0000000000..25eb336c05 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddress.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import java.net.URL; + +/** + * @version $Rev$ $Date$ + */ +public class EndpointAddress implements EncodingObject { + + private URL address; + + public URL getAddress() { + return address; + } + + public void setAddress(URL address) { + this.address = address; + } + + public void setAddress(String addressText) throws EncodingException { + try { + this.address = new URL(addressText); + } catch(Exception e) { + throw new EncodingException(e); + } + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddressEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddressEnDeCoder.java new file mode 100644 index 0000000000..76f2e8ec81 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddressEnDeCoder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamConstants; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class EndpointAddressEnDeCoder extends AbstractEnDeCoder<EndpointAddress> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.ADDRESSING_NS, Constants.Address); + + public EndpointAddressEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(EndpointAddress encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + writer.writeStartElement(Constants.ADDRESSING_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI()); + writer.writeNamespace(Constants.ADDRESSING_PREFIX, QNAME.getNamespaceURI()); + writer.writeCharacters(encodingObject.getAddress().toString()); + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public EndpointAddress decode(XMLStreamReader reader) throws EncodingException { + + try { + EndpointAddress endpointAddressElement = new EndpointAddress(); + while (true) { + switch (reader.next()) { + case START_ELEMENT: + break; + case XMLStreamConstants.CHARACTERS: + if (reader.hasText()) { + String address = reader.getText(); + endpointAddressElement.setAddress(address); + } + else { + throw new EncodingException("Endpoint address is missing address"); + } + break; + case END_ELEMENT: + return endpointAddressElement; + } + } + } catch (XMLStreamException ex) { + throw new EncodingException(ex); + } + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<EndpointAddress> getEncodingObjectType() { + + return EndpointAddress.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReference.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReference.java new file mode 100644 index 0000000000..fa73da9a19 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReference.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class EndpointReference implements EncodingObject { + + private EndpointAddress endpointAddress; + private ReferenceProperties referenceProperties; + + public EndpointAddress getEndpointAddress() { + return this.endpointAddress; + } + + public void setEndpointAddress(EndpointAddress endpointAddress) { + this.endpointAddress = endpointAddress; + } + + public ReferenceProperties getReferenceProperties() { + return this.referenceProperties; + } + + public void setReferenceProperties(ReferenceProperties referenceProperties) { + this.referenceProperties = referenceProperties; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceEnDeCoder.java new file mode 100644 index 0000000000..8a05440319 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceEnDeCoder.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class EndpointReferenceEnDeCoder extends AbstractEnDeCoder<EndpointReference> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.ADDRESSING_NS, Constants.EndpointReference); + + public EndpointReferenceEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(EndpointReference encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + writer.writeStartElement(Constants.ADDRESSING_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI()); + writer.writeNamespace(Constants.ADDRESSING_PREFIX, QNAME.getNamespaceURI()); + registry.encode(encodingObject.getEndpointAddress(), writer); + if (encodingObject.getReferenceProperties() != null) { + registry.encode(encodingObject.getReferenceProperties(), writer); + } + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public EndpointReference decode(XMLStreamReader reader) throws EncodingException { + + try { + EndpointReference endpointReferenceElement = new EndpointReference(); + boolean haveEPA = false; + boolean haveRP = false; + while (true) { + switch (reader.next()) { + case START_ELEMENT: + EncodingObject encodingObject = registry.decode(reader); + if (encodingObject instanceof EndpointAddress && !haveEPA) { + endpointReferenceElement.setEndpointAddress((EndpointAddress)encodingObject); + haveEPA = true; + } + else if(encodingObject instanceof ReferenceProperties && !haveRP) { + endpointReferenceElement.setReferenceProperties((ReferenceProperties)encodingObject); + haveRP = true; + } + else { + throw new EncodingException("Invalid encoding object"); + } + break; + case END_ELEMENT: + if (!haveEPA) { + throw new EncodingException("Missing endpoint address"); + } + return endpointReferenceElement; + } + } + } catch (Exception ex) { + throw new EncodingException(ex); + } + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<EndpointReference> getEncodingObjectType() { + + return EndpointReference.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequence.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequence.java new file mode 100644 index 0000000000..39d53bca8d --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequence.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import java.util.ArrayList; +import java.util.List; + +/** + * @version $Rev$ $Date$ + */ +public class EndpointReferenceSequence implements EncodingObject { + + private List<EndpointReference> referenceSequence; + private String sequenceType; + + public List<EndpointReference> getReferenceSequence() { + return referenceSequence; + } + + public void addReferenceToSequence(EndpointReference address) { + if(this.referenceSequence == null) { + this.referenceSequence = new ArrayList<EndpointReference>(); + } + this.referenceSequence.add(address); + } + + public void setReferenceSequence(List<EndpointReference> referenceSequence) { + this.referenceSequence = referenceSequence; + } + + public String getSequenceType() { + return this.sequenceType; + } + + public void setSequenceType(String sequenceType) { + this.sequenceType = sequenceType; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequenceEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequenceEnDeCoder.java new file mode 100644 index 0000000000..13d686b803 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequenceEnDeCoder.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public abstract class EndpointReferenceSequenceEnDeCoder<ERS extends EndpointReferenceSequence> extends AbstractEnDeCoder<ERS> { + + public EndpointReferenceSequenceEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(ERS encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + QName qName = getEncodingObjectQName(); + writer.writeStartElement(Constants.NOTIFICATION_PREFIX, qName.getLocalPart(), qName.getNamespaceURI()); + writer.writeNamespace(Constants.NOTIFICATION_PREFIX, qName.getNamespaceURI()); + encodeSequenceTypeAttribute(encodingObject, writer); + if (encodingObject.getReferenceSequence() != null) { + for (EndpointReference endpointReference : encodingObject.getReferenceSequence()) { + registry.encode(endpointReference, writer); + } + } + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + protected abstract void encodeSequenceTypeAttribute(ERS encodingObject, XMLStreamWriter writer) throws EncodingException; + + public ERS decode(XMLStreamReader reader) throws EncodingException { + + try { + ERS endpointReferenceSequenceElement = null; + try { + endpointReferenceSequenceElement = getEncodingObjectType().newInstance(); + } catch(Exception e) { + throw new EncodingException(e); + } + String sequenceType = decodeSequenceTypeAttribute(reader); + endpointReferenceSequenceElement.setSequenceType(sequenceType); + while (true) { + switch (reader.next()) { + case START_ELEMENT: + EncodingObject encodingObject = registry.decode(reader); + endpointReferenceSequenceElement.addReferenceToSequence((EndpointReference)encodingObject); + break; + case END_ELEMENT: + return endpointReferenceSequenceElement; + } + } + } catch (XMLStreamException ex) { + throw new EncodingException(ex); + } + } + + protected abstract String decodeSequenceTypeAttribute(XMLStreamReader reader); +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapper.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapper.java new file mode 100644 index 0000000000..f1f1a7737b --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapper.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class EndpointReferenceWrapper implements EncodingObject { + + private EndpointReference reference; + + public EndpointReference getReference() { + return reference; + } + + public void setReference(EndpointReference reference) { + this.reference = reference; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapperEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapperEnDeCoder.java new file mode 100644 index 0000000000..aeafce46e0 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapperEnDeCoder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public abstract class EndpointReferenceWrapperEnDeCoder<ERW extends EndpointReferenceWrapper> extends AbstractEnDeCoder<ERW> { + + public EndpointReferenceWrapperEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(ERW encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + QName qName = getEncodingObjectQName(); + writer.writeStartElement(Constants.NOTIFICATION_PREFIX, qName.getLocalPart(), qName.getNamespaceURI()); + writer.writeNamespace(Constants.NOTIFICATION_PREFIX, qName.getNamespaceURI()); + registry.encode(encodingObject.getReference(), writer); + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public ERW decode(XMLStreamReader reader) throws EncodingException { + + try { + ERW endpointReferenceWrapperElement = null; + try { + endpointReferenceWrapperElement = getEncodingObjectType().newInstance(); + } catch(Exception e) { + throw new EncodingException(e); + } + while (true) { + switch (reader.next()) { + case START_ELEMENT: + EncodingObject encodingObject = registry.decode(reader); + endpointReferenceWrapperElement.setReference((EndpointReference)encodingObject); + break; + case END_ELEMENT: + return endpointReferenceWrapperElement; + } + } + } catch (XMLStreamException ex) { + throw new EncodingException(ex); + } + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumers.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumers.java new file mode 100644 index 0000000000..5871d370fb --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumers.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class NeighborBrokerConsumers extends EndpointReferenceSequence { + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumersEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumersEnDeCoder.java new file mode 100644 index 0000000000..ba52687e16 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumersEnDeCoder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class NeighborBrokerConsumersEnDeCoder extends EndpointReferenceSequenceEnDeCoder<NeighborBrokerConsumers> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NeighborBrokerConsumers); + + public NeighborBrokerConsumersEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + + public QName getEncodingObjectQName() { + return QNAME; + } + + + public Class<NeighborBrokerConsumers> getEncodingObjectType() { + return NeighborBrokerConsumers.class; + } + + @Override + protected void encodeSequenceTypeAttribute(NeighborBrokerConsumers encodingObject, XMLStreamWriter writer) throws EncodingException { + try { + writer.writeAttribute(Constants.ConsumerSequenceType, encodingObject.getSequenceType()); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + @Override + protected String decodeSequenceTypeAttribute(XMLStreamReader reader) { + return reader.getAttributeValue(null, Constants.ConsumerSequenceType); + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Neighbors.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Neighbors.java new file mode 100644 index 0000000000..0aec2da465 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Neighbors.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import java.util.ArrayList; +import java.util.List; + +/** + * @version $Rev$ $Date$ + */ +public class Neighbors implements EncodingObject { + + private List<Broker> brokerSequence; + + public List<Broker> getBrokerSequence() { + return brokerSequence; + } + + public void addBrokerToSequence(Broker broker) { + if(this.brokerSequence == null) { + this.brokerSequence = new ArrayList<Broker>(); + } + this.brokerSequence.add(broker); + } + + public void setBrokerSequence(List<Broker> brokerSequence) { + this.brokerSequence = brokerSequence; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborsEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborsEnDeCoder.java new file mode 100644 index 0000000000..53f56b6626 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborsEnDeCoder.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class NeighborsEnDeCoder extends AbstractEnDeCoder<Neighbors> { + + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.Neighbors); + + public NeighborsEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(Neighbors encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + QName qName = getEncodingObjectQName(); + writer.writeStartElement(Constants.NOTIFICATION_PREFIX, qName.getLocalPart(), qName.getNamespaceURI()); + writer.writeNamespace(Constants.NOTIFICATION_PREFIX, qName.getNamespaceURI()); + if (encodingObject.getBrokerSequence() != null) { + for (Broker broker : encodingObject.getBrokerSequence()) { + registry.encode(broker, writer); + } + } + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public Neighbors decode(XMLStreamReader reader) throws EncodingException { + + try { + Neighbors neighborsElement = new Neighbors(); + while (true) { + switch (reader.next()) { + case START_ELEMENT: + EncodingObject encodingObject = registry.decode(reader); + neighborsElement.addBrokerToSequence((Broker)encodingObject); + break; + case END_ELEMENT: + return neighborsElement; + } + } + } catch (XMLStreamException ex) { + throw new EncodingException(ex); + } + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<Neighbors> getEncodingObjectType() { + + return Neighbors.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBroker.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBroker.java new file mode 100644 index 0000000000..1684d0c3ee --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBroker.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class NewBroker extends AbstractBroker { +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAck.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAck.java new file mode 100644 index 0000000000..f4702806e0 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAck.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class NewBrokerAck implements EncodingObject { +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAckEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAckEnDeCoder.java new file mode 100644 index 0000000000..fcd0717688 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAckEnDeCoder.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class NewBrokerAckEnDeCoder extends AbstractEnDeCoder<NewBrokerAck> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewBrokerAck); + + public NewBrokerAckEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(NewBrokerAck encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI()); + writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI()); + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public NewBrokerAck decode(XMLStreamReader reader) throws EncodingException { + + try { + NewBrokerAck newBrokerAck = new NewBrokerAck(); + while (true) { + switch (reader.next()) { + case END_ELEMENT: + return newBrokerAck; + } + } + } catch (Exception ex) { + throw new EncodingException(ex); + } + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<NewBrokerAck> getEncodingObjectType() { + + return NewBrokerAck.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerEnDeCoder.java new file mode 100644 index 0000000000..db65ba7491 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerEnDeCoder.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; + +/** + * @version $Rev$ $Date$ + */ +public class NewBrokerEnDeCoder extends AbstractBrokerEnDeCoder<NewBroker> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewBroker); + + public NewBrokerEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<NewBroker> getEncodingObjectType() { + + return NewBroker.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponse.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponse.java new file mode 100644 index 0000000000..72aa20af3d --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponse.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class NewBrokerResponse implements EncodingObject { + + private EndProducers endProducers; + private EndConsumers endConsumers; + private Brokers brokers; + private boolean firstBroker; + + public EndProducers getEndProducers() { + return this.endProducers; + } + + public void setEndProducers(EndProducers endProducers) { + this.endProducers = endProducers; + } + + public EndConsumers getEndConsumers() { + return this.endConsumers; + } + + public void setEndConsumers(EndConsumers endConsumers) { + this.endConsumers = endConsumers; + } + + public Brokers getBrokers() { + return this.brokers; + } + + public void setBrokers(Brokers brokers) { + this.brokers = brokers; + } + + public boolean isFirstBroker() { + return this.firstBroker; + } + + public void setFirstBroker(boolean firstBroker) { + this.firstBroker = firstBroker; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponseEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponseEnDeCoder.java new file mode 100644 index 0000000000..391820444f --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponseEnDeCoder.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class NewBrokerResponseEnDeCoder extends AbstractEnDeCoder<NewBrokerResponse> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewBrokerResponse); + + public NewBrokerResponseEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(NewBrokerResponse encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + QName qName = getEncodingObjectQName(); + writer.writeStartElement(Constants.NOTIFICATION_PREFIX, qName.getLocalPart(), qName.getNamespaceURI()); + writer.writeNamespace(Constants.NOTIFICATION_PREFIX, qName.getNamespaceURI()); + writer.writeAttribute(Constants.FirstBroker, String.valueOf(encodingObject.isFirstBroker())); + if (encodingObject.isFirstBroker()) { + registry.encode(encodingObject.getEndConsumers(), writer); + registry.encode(encodingObject.getEndProducers(), writer); + } + else { + registry.encode(encodingObject.getBrokers(), writer); + } + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public NewBrokerResponse decode(XMLStreamReader reader) throws EncodingException { + + try { + NewBrokerResponse newBrokerResponseElement = new NewBrokerResponse(); + boolean firstBroker = Boolean.parseBoolean(reader.getAttributeValue(null, Constants.FirstBroker)); + newBrokerResponseElement.setFirstBroker(firstBroker); + boolean haveEC = false; + boolean haveEP = false; + boolean haveB = false; + while (true) { + switch (reader.next()) { + case START_ELEMENT: + EncodingObject encodingObject = registry.decode(reader); + if (encodingObject instanceof EndProducers && !haveEP && firstBroker) { + newBrokerResponseElement.setEndProducers((EndProducers)encodingObject); + haveEP = true; + } + else if(encodingObject instanceof EndConsumers && !haveEC && firstBroker) { + newBrokerResponseElement.setEndConsumers((EndConsumers)encodingObject); + haveEC = true; + } + else if(encodingObject instanceof Brokers && !haveB && !firstBroker) { + newBrokerResponseElement.setBrokers((Brokers)encodingObject); + haveB = true; + } + else { + throw new EncodingException("Invalid encoding object"); + } + break; + case END_ELEMENT: + if (!haveEP && firstBroker) { + throw new EncodingException("Missing end producers"); + } + if (!haveEC && firstBroker) { + throw new EncodingException("Missing end consumers"); + } + if (!haveB && !firstBroker) { + throw new EncodingException("Missing brokers"); + } + return newBrokerResponseElement; + } + } + } catch (Exception ex) { + throw new EncodingException(ex); + } + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<NewBrokerResponse> getEncodingObjectType() { + + return NewBrokerResponse.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumer.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumer.java new file mode 100644 index 0000000000..2c0c846841 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class NewConsumer extends EndpointReferenceWrapper { + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerEnDeCoder.java new file mode 100644 index 0000000000..8314d628ba --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerEnDeCoder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; + +/** + * @version $Rev$ $Date$ + */ +public class NewConsumerEnDeCoder extends EndpointReferenceWrapperEnDeCoder<NewConsumer> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewConsumer); + + public NewConsumerEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + + public QName getEncodingObjectQName() { + return QNAME; + } + + + public Class<NewConsumer> getEncodingObjectType() { + return NewConsumer.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponse.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponse.java new file mode 100644 index 0000000000..b89bfa2470 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponse.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class NewConsumerResponse extends EndpointReferenceSequence { + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponseEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponseEnDeCoder.java new file mode 100644 index 0000000000..1abb187271 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponseEnDeCoder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class NewConsumerResponseEnDeCoder extends EndpointReferenceSequenceEnDeCoder<NewConsumerResponse> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewConsumerResponse); + + public NewConsumerResponseEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + + public QName getEncodingObjectQName() { + return QNAME; + } + + + public Class<NewConsumerResponse> getEncodingObjectType() { + return NewConsumerResponse.class; + } + + @Override + protected void encodeSequenceTypeAttribute(NewConsumerResponse encodingObject, XMLStreamWriter writer) throws EncodingException { + try { + writer.writeAttribute(Constants.ProducerSequenceType, encodingObject.getSequenceType()); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + @Override + protected String decodeSequenceTypeAttribute(XMLStreamReader reader) { + return reader.getAttributeValue(null, Constants.ProducerSequenceType); + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducer.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducer.java new file mode 100644 index 0000000000..f12a4582dd --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class NewProducer extends EndpointReferenceWrapper { + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerEnDeCoder.java new file mode 100644 index 0000000000..75ee8aea6f --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerEnDeCoder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; + +/** + * @version $Rev$ $Date$ + */ +public class NewProducerEnDeCoder extends EndpointReferenceWrapperEnDeCoder<NewProducer> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewProducer); + + public NewProducerEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + + public QName getEncodingObjectQName() { + return QNAME; + } + + + public Class<NewProducer> getEncodingObjectType() { + return NewProducer.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponse.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponse.java new file mode 100644 index 0000000000..91bf371549 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponse.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class NewProducerResponse extends EndpointReferenceSequence { + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponseEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponseEnDeCoder.java new file mode 100644 index 0000000000..34dee40761 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponseEnDeCoder.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class NewProducerResponseEnDeCoder extends EndpointReferenceSequenceEnDeCoder<NewProducerResponse> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewProducerResponse); + + public NewProducerResponseEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + + public QName getEncodingObjectQName() { + return QNAME; + } + + + public Class<NewProducerResponse> getEncodingObjectType() { + return NewProducerResponse.class; + } + + @Override + protected void encodeSequenceTypeAttribute(NewProducerResponse encodingObject, XMLStreamWriter writer) throws EncodingException { + try { + writer.writeAttribute(Constants.ConsumerSequenceType, encodingObject.getSequenceType()); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + @Override + protected String decodeSequenceTypeAttribute(XMLStreamReader reader) { + return reader.getAttributeValue(null, Constants.ConsumerSequenceType); + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferenceProperties.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferenceProperties.java new file mode 100644 index 0000000000..0f661717c0 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferenceProperties.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import java.util.ArrayList; +import java.util.List; + +/** + * @version $Rev$ $Date$ + */ +public class ReferenceProperties implements EncodingObject { + + private List<EncodingObject> properties; + + public List<EncodingObject> getProperties() { + return properties; + } + + public void addProperty(EncodingObject property) { + if(this.properties == null) { + this.properties = new ArrayList<EncodingObject>(); + } + this.properties.add(property); + } + + @SuppressWarnings("unchecked") + public <E extends EncodingObject> E getProperty(Class<E> propertyType) { + if (this.properties == null) { + return null; + } + for (EncodingObject eo : properties) { + if (propertyType.isInstance(eo)) { + return (E)eo; + } + } + return null; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferencePropertiesEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferencePropertiesEnDeCoder.java new file mode 100644 index 0000000000..6ffbd8724e --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferencePropertiesEnDeCoder.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class ReferencePropertiesEnDeCoder extends AbstractEnDeCoder<ReferenceProperties> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.ADDRESSING_NS, Constants.ReferenceProperties); + + public ReferencePropertiesEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(ReferenceProperties encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + writer.writeStartElement(Constants.ADDRESSING_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI()); + writer.writeNamespace(Constants.ADDRESSING_PREFIX, QNAME.getNamespaceURI()); + if (encodingObject.getProperties() != null) { + for (EncodingObject property : encodingObject.getProperties()) { + registry.encode(property, writer); + } + } + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public ReferenceProperties decode(XMLStreamReader reader) throws EncodingException { + + try { + ReferenceProperties referencePropertiesElement = new ReferenceProperties(); + while (true) { + switch (reader.next()) { + case START_ELEMENT: + EncodingObject property = registry.decode(reader); + referencePropertiesElement.addProperty(property); + break; + case END_ELEMENT: + return referencePropertiesElement; + } + } + } catch (XMLStreamException ex) { + throw new EncodingException(ex); + } + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<ReferenceProperties> getEncodingObjectType() { + + return ReferenceProperties.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBroker.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBroker.java new file mode 100644 index 0000000000..0047f2b6cb --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBroker.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class RemoveBroker implements EncodingObject { + + private BrokerConsumerReference brokerConsumerReference; + private NeighborBrokerConsumers neighborBrokerConsumers; + + public BrokerConsumerReference getBrokerConsumerReference() { + return this.brokerConsumerReference; + } + + public void setBrokerConsumerReference(BrokerConsumerReference brokerConsumerReference) { + this.brokerConsumerReference = brokerConsumerReference; + } + + public NeighborBrokerConsumers getNeighborBrokerConsumers() { + return this.neighborBrokerConsumers; + } + + public void setNeighborBrokerConsumers(NeighborBrokerConsumers neighborBrokerConsumers) { + this.neighborBrokerConsumers = neighborBrokerConsumers; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBrokerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBrokerEnDeCoder.java new file mode 100644 index 0000000000..56e6c361c3 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBrokerEnDeCoder.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class RemoveBrokerEnDeCoder extends AbstractEnDeCoder<RemoveBroker> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.RemoveBroker); + + public RemoveBrokerEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(RemoveBroker encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI()); + writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI()); + registry.encode(encodingObject.getBrokerConsumerReference(), writer); + if (encodingObject.getNeighborBrokerConsumers() != null) { + registry.encode(encodingObject.getNeighborBrokerConsumers(), writer); + } + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public RemoveBroker decode(XMLStreamReader reader) throws EncodingException { + + try { + RemoveBroker removeBrokerElement = new RemoveBroker(); + boolean haveBCR = false; + boolean haveNBC = false; + while (true) { + switch (reader.next()) { + case START_ELEMENT: + EncodingObject encodingObject = registry.decode(reader); + if (encodingObject instanceof BrokerConsumerReference && !haveBCR) { + removeBrokerElement.setBrokerConsumerReference((BrokerConsumerReference)encodingObject); + haveBCR = true; + } + else if(encodingObject instanceof NeighborBrokerConsumers && !haveNBC) { + removeBrokerElement.setNeighborBrokerConsumers((NeighborBrokerConsumers)encodingObject); + haveNBC = true; + } + else { + throw new EncodingException("Invalid encoding object"); + } + break; + case END_ELEMENT: + if (!haveBCR) { + throw new EncodingException("Missing broker consumer reference"); + } + return removeBrokerElement; + } + } + } catch (Exception ex) { + throw new EncodingException(ex); + } + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<RemoveBroker> getEncodingObjectType() { + + return RemoveBroker.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBroker.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBroker.java new file mode 100644 index 0000000000..40795987d0 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBroker.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class RemovedBroker extends EndpointReferenceWrapper { + +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBrokerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBrokerEnDeCoder.java new file mode 100644 index 0000000000..3f0ddc546c --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBrokerEnDeCoder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import javax.xml.namespace.QName; + +/** + * @version $Rev$ $Date$ + */ +public class RemovedBrokerEnDeCoder extends EndpointReferenceWrapperEnDeCoder<RemovedBroker> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.RemovedBroker); + + public RemovedBrokerEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + + public QName getEncodingObjectQName() { + return QNAME; + } + + + public Class<RemovedBroker> getEncodingObjectType() { + return RemovedBroker.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnection.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnection.java new file mode 100644 index 0000000000..68bc368a53 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnection.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class ReplaceBrokerConnection implements EncodingObject { + + private RemovedBroker removedBroker; + private Neighbors neighbors; + + public RemovedBroker getRemovedBroker() { + return this.removedBroker; + } + + public void setRemovedBroker(RemovedBroker removedBroker) { + this.removedBroker = removedBroker; + } + + public Neighbors getNeighbors() { + return this.neighbors; + } + + public void setNeighbors(Neighbors neighbors) { + this.neighbors = neighbors; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnectionEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnectionEnDeCoder.java new file mode 100644 index 0000000000..057d686905 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnectionEnDeCoder.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class ReplaceBrokerConnectionEnDeCoder extends AbstractEnDeCoder<ReplaceBrokerConnection> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.ReplaceBrokerConnection); + + public ReplaceBrokerConnectionEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(ReplaceBrokerConnection encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI()); + writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI()); + registry.encode(encodingObject.getRemovedBroker(), writer); + if (encodingObject.getNeighbors() != null) { + registry.encode(encodingObject.getNeighbors(), writer); + } + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public ReplaceBrokerConnection decode(XMLStreamReader reader) throws EncodingException { + + try { + ReplaceBrokerConnection replaceBrokerConnectionElement = new ReplaceBrokerConnection(); + boolean haveRB = false; + boolean haveN = false; + while (true) { + switch (reader.next()) { + case START_ELEMENT: + EncodingObject encodingObject = registry.decode(reader); + if (encodingObject instanceof RemovedBroker && !haveRB) { + replaceBrokerConnectionElement.setRemovedBroker((RemovedBroker)encodingObject); + haveRB = true; + } + else if(encodingObject instanceof Neighbors && !haveN) { + replaceBrokerConnectionElement.setNeighbors((Neighbors)encodingObject); + haveN = true; + } + else { + throw new EncodingException("Invalid encoding object"); + } + break; + case END_ELEMENT: + if (!haveRB) { + throw new EncodingException("Missing removed broker"); + } + return replaceBrokerConnectionElement; + } + } + } catch (Exception ex) { + throw new EncodingException(ex); + } + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<ReplaceBrokerConnection> getEncodingObjectType() { + + return ReplaceBrokerConnection.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Subscribe.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Subscribe.java new file mode 100644 index 0000000000..d87c9d4eb5 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Subscribe.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +/** + * @version $Rev$ $Date$ + */ +public class Subscribe implements EncodingObject { + + private ConsumerReference consumerReference; + + public ConsumerReference getConsumerReference() { + return consumerReference; + } + + public void setConsumerReference(ConsumerReference consumerReference) { + this.consumerReference = consumerReference; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/SubscribeEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/SubscribeEnDeCoder.java new file mode 100644 index 0000000000..8f5d368e05 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/SubscribeEnDeCoder.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import static javax.xml.stream.XMLStreamConstants.END_ELEMENT; +import static javax.xml.stream.XMLStreamConstants.START_ELEMENT; + +import javax.xml.namespace.QName; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +/** + * @version $Rev$ $Date$ + */ +public class SubscribeEnDeCoder extends AbstractEnDeCoder<Subscribe> { + + // QName for the root element + public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.Subscribe); + + public SubscribeEnDeCoder(EncodingRegistry registry) { + super(registry); + } + + public void encode(Subscribe encodingObject, XMLStreamWriter writer) throws EncodingException { + + try { + writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI()); + writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI()); + registry.encode(encodingObject.getConsumerReference(), writer); + writer.writeEndElement(); + } catch(XMLStreamException e) { + throw new EncodingException(e); + } + } + + public Subscribe decode(XMLStreamReader reader) throws EncodingException { + + try { + Subscribe subscribeElement = new Subscribe(); + while (true) { + switch (reader.next()) { + case START_ELEMENT: + EncodingObject encodingObject = registry.decode(reader); + subscribeElement.setConsumerReference((ConsumerReference)encodingObject); + break; + case END_ELEMENT: + return subscribeElement; + } + } + } catch (XMLStreamException ex) { + throw new EncodingException(ex); + } + } + + + public QName getEncodingObjectQName() { + + return QNAME; + } + + + public Class<Subscribe> getEncodingObjectType() { + + return Subscribe.class; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/IOUtils.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/IOUtils.java new file mode 100644 index 0000000000..a45c6807e6 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/IOUtils.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.util; + +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +/** + * @version $Rev$ $Date$ + */ +public class IOUtils { + + // FIXME: For some reason, tomcat converts the header names to be lower case, see TUSCANY-1791 + public static final String Notification_Source = "notification-source"; + public static final String Notification_Target = "notification-target"; + public static final String Notification_Operation = "notification-operation"; + + public static final int DEF_BLOCK_SIZE = 512; + + public static Object sendHttpRequest(URL targetURL, + String opName, + Writeable wbody, + ReadableContinuation rcont) throws Exception { + if (opName == null) { + opName = ""; + } + Map<String, String> headers = new HashMap<String, String>(); + headers.put(Notification_Operation, opName); + return sendHttpRequest(targetURL, headers, wbody, rcont); + } + + public static Object sendHttpRequest(URL targetURL, + Map<String, String> headers, + Writeable wbody, + ReadableContinuation rcont) throws Exception { + + String targetUri = targetURL.toString(); + String sourceUri = ""; + + final HttpURLConnection con = (HttpURLConnection) targetURL.openConnection(); + con.setRequestMethod("POST"); + //con.setRequestProperty("Content-Length", Integer.toString(sbody.getBytes().length)); + con.setAllowUserInteraction(false); + con.setInstanceFollowRedirects(false); + if (targetUri != null) { + con.setRequestProperty(Notification_Target, targetUri); + } + + if (sourceUri != null) { + con.setRequestProperty(Notification_Source, sourceUri); + } + + for (String key : headers.keySet()) { + con.setRequestProperty(key, headers.get(key)); + } + con.setDoOutput(true); + con.setDoInput(true); + con.connect(); + Object response = null; + try { + if (wbody != null) { + OutputStream ost = con.getOutputStream(); + wbody.write(ost); + } + else { + throw new IOUtilsException("Missing writeable body"); + } + final int rc = con.getResponseCode(); + switch (rc) { + case HttpURLConnection.HTTP_OK: + if (rcont != null) { + InputStream ist = con.getInputStream(); + response = rcont.read(ist); + } + break; + case HttpURLConnection.HTTP_NO_CONTENT: + break; + default: + throw new RuntimeException("Unexpected response code: " + rc); + } + } + finally + { + con.disconnect(); + } + return response; + } + + public interface Writeable { + void write(OutputStream os) throws IOUtilsException; + } + + public interface ReadableContinuation { + Object read(InputStream is) throws IOUtilsException; + } + + @SuppressWarnings("serial") + public static class IOUtilsException extends Exception { + + /** + * + */ + private static final long serialVersionUID = 1L; + + public IOUtilsException(String message) { + super(message); + } + + public IOUtilsException(Throwable cause) { + super(cause); + } + } + + public static byte [] readFully(final InputStream ist, int len) throws IOException { + ByteArrayOutputStream baost = new ByteArrayOutputStream(); + copyStream(ist,baost,len); + return baost.toByteArray(); + } + + public static int copyStream(final InputStream ist, final OutputStream ost) throws IOException { + return copyStream(ist, ost, -1, 0); + } + + public static int copyStream(final InputStream ist, final OutputStream ost, int length) throws IOException { + return copyStream(ist, ost, length, 0); + } + + public static int copyStream(final InputStream ist, final OutputStream ost, final int length, int blockSize) throws IOException { + + int cbCopied = 0; + if (blockSize <= 0) { + blockSize = DEF_BLOCK_SIZE; + } + + final byte[] block = new byte[blockSize]; + boolean done = length == 0; + while (!done) { + try { + // determine how many bytes to read + final int cbToRead = length == -1 ? block.length : (Math.min(length - cbCopied, block.length)); + final int cbRead = ist.read(block, 0, cbToRead); + if (cbRead == -1) { + done = true; + } + else { + ost.write(block, 0, cbRead); + cbCopied += cbRead; + done = cbCopied == length; + } + } catch (final EOFException e) { + done = true; + } + } + ost.flush(); + return cbCopied; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/NotificationServlet.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/NotificationServlet.java new file mode 100644 index 0000000000..046914e9db --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/NotificationServlet.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.util; + +import java.io.IOException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; + +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + + +/** + * Receives notification in HTTP request and dispatches it down the wire + * + * @version $Rev$ $Date$ + */ +@SuppressWarnings("serial") +public class NotificationServlet extends HttpServlet { + + /** + * + */ + private static final long serialVersionUID = 1L; + private NotificationServletHandler handler; + private NotificationServletStreamHandler servletStreamHandler; + + public NotificationServlet(NotificationServletHandler handler) { + this.handler = handler; + this.servletStreamHandler = null; + } + + public NotificationServlet(NotificationServletStreamHandler servletStreamHandler) { + this.handler = null; + this.servletStreamHandler = servletStreamHandler; + } + + @Override + public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException { + HashMap<String, String> headers = new HashMap<String, String>(); + Enumeration headerNames = request.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String headerName = (String)headerNames.nextElement(); + headers.put(headerName, request.getHeader(headerName)); + } + if (handler != null) { + byte[] requestBody = IOUtils.readFully(request.getInputStream(), request.getContentLength()); + byte[] handlersResponse = handler.handle(headers, requestBody); + if (handlersResponse != null) { + response.getOutputStream().write(handlersResponse); + response.getOutputStream().flush(); + } + } + else { + try { + servletStreamHandler.handle(headers, request.getInputStream(), request.getContentLength(), response.getOutputStream()); + } + catch(RuntimeException e) { + e.printStackTrace(); + } + } + } + + public interface NotificationServletHandler { + public byte[] handle(Map<String, String> headers, byte[] payload); + } + + public interface NotificationServletStreamHandler { + public void handle(Map<String, String> headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream); + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/URIUtil.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/URIUtil.java new file mode 100644 index 0000000000..a7f26003b0 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/URIUtil.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.util; + +import java.net.URI; + +/** + * @version $Rev$ $Date$ + */ +public class URIUtil { + + public static String getPath(URI uri) { + String path = null; + + if (uri.isOpaque()) { + path = "/" + uri.getSchemeSpecificPart(); + } + else if (uri.isAbsolute()) { + path = uri.getPath(); + } else { + path = "/" + uri.getPath(); + } + + return path; + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/resources/META-INF/services/org.apache.tuscany.sca.core.ModuleActivator b/branches/sca-java-1.2.1/modules/binding-notification/src/main/resources/META-INF/services/org.apache.tuscany.sca.core.ModuleActivator new file mode 100644 index 0000000000..5e5ce97054 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/resources/META-INF/services/org.apache.tuscany.sca.core.ModuleActivator @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# Implementation class for the ExtensionActivator
+org.apache.tuscany.sca.binding.notification.NotificationBindingModuleActivator
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/AxiomTestCase.java b/branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/AxiomTestCase.java new file mode 100644 index 0000000000..a7e864cb55 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/AxiomTestCase.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import java.io.ByteArrayInputStream; +import java.io.StringWriter; + +import junit.framework.TestCase; + +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.impl.builder.StAXOMBuilder; +import org.junit.Assert; + +public class AxiomTestCase extends TestCase { + + private static String wsnt = "http://docs.oasis-open.org/wsn/b-2"; + private static String wsa = "http://schemas.xmlsoap.org/ws/2004/08/addressing"; + private static String testUrl1 = "http://localhost:8081/test"; + private static String testUrl2 = "http://localhost:8082/test"; + private static String testNewProducerResponse = + "<wsnt:NewProducerResponse xmlns:wsnt=\"" + wsnt + "\" ConsumerSequenceType=\"EndConsumers\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" + + "</wsnt:NewProducerResponse>"; + + public void testAxiom() { + try { + StAXOMBuilder builder = new StAXOMBuilder(new ByteArrayInputStream(testNewProducerResponse.getBytes())); + OMElement element = builder.getDocumentElement(); + Assert.assertNotNull(element); + + StringWriter sw = new StringWriter(); + element.serialize(sw); + sw.flush(); + Assert.assertEquals(sw.toString(),testNewProducerResponse); + } + catch(Throwable e) { + e.printStackTrace(); + } + } +} diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingTestCase.java b/branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingTestCase.java new file mode 100644 index 0000000000..4f51175407 --- /dev/null +++ b/branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingTestCase.java @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.notification.encoding; + +import java.io.StringReader; +import java.io.StringWriter; +import java.util.Iterator; + +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; + +import junit.framework.Assert; +import junit.framework.TestCase; + +/** + * @version $Rev$ $Date$ + */ +public class EncodingTestCase extends TestCase { + + private static String wsnt = "http://docs.oasis-open.org/wsn/b-2"; + private static String wsa = "http://schemas.xmlsoap.org/ws/2004/08/addressing"; + private static String testUrl = "http://localhost:8080/test"; + private static String testUrl1 = "http://localhost:8081/test"; + private static String testUrl2 = "http://localhost:8082/test"; + private static String bid1 = "UUID1"; + private static String bid2 = "UUID2"; + private static String testSubscribe = + "<wsnt:Subscribe xmlns:wsnt=\"" + wsnt + "\">" + + "<wsnt:ConsumerReference xmlns:wsnt=\"" + wsnt + "\">" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl + "</wsa:Address>" + + "</wsa:EndpointReference>" + + "</wsnt:ConsumerReference>" + + "</wsnt:Subscribe>"; + private static String testNewConsumerResponse = + "<wsnt:NewConsumerResponse xmlns:wsnt=\"" + wsnt + "\" ProducerSequenceType=\"EndProducers\">" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl + "</wsa:Address>" + + "</wsa:EndpointReference>" + + "</wsnt:NewConsumerResponse>"; + private static String testNewProducerResponse = + "<wsnt:NewProducerResponse xmlns:wsnt=\"" + wsnt + "\" ConsumerSequenceType=\"EndConsumers\">" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" + + "</wsa:EndpointReference>" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" + + "</wsa:EndpointReference>" + + "</wsnt:NewProducerResponse>"; + private static String testNoProducersResponse = + "<wsnt:NewConsumerResponse xmlns:wsnt=\"" + wsnt + "\" ProducerSequenceType=\"NoProducers\" />"; + private static String testNewBroker = + "<wsnt:NewBroker xmlns:wsnt=\"" + wsnt + "\">" + + "<wsnt:BrokerConsumerReference xmlns:wsnt=\"" + wsnt + "\">" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" + + "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" + + "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid1 + "</wsnt:BrokerID>" + + "</wsa:ReferenceProperties>" + + "</wsa:EndpointReference>" + + "</wsnt:BrokerConsumerReference>" + + "<wsnt:BrokerProducerReference xmlns:wsnt=\"" + wsnt + "\">" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" + + "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" + + "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid2 + "</wsnt:BrokerID>" + + "</wsa:ReferenceProperties>" + + "</wsa:EndpointReference>" + + "</wsnt:BrokerProducerReference>" + + "</wsnt:NewBroker>"; + private static String testNewBrokerResponse1 = + "<wsnt:NewBrokerResponse xmlns:wsnt=\"" + wsnt + "\" FirstBroker=\"true\">" + + "<wsnt:EndConsumers xmlns:wsnt=\"" + wsnt + "\" ConsumerSequenceType=\"EndConsumers\">" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" + + "</wsa:EndpointReference>" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" + + "</wsa:EndpointReference>" + + "</wsnt:EndConsumers>" + + "<wsnt:EndProducers xmlns:wsnt=\"" + wsnt + "\" ProducerSequenceType=\"NoProducers\" />" + + "</wsnt:NewBrokerResponse>"; + private static String testNewBrokerResponse2 = + "<wsnt:NewBrokerResponse xmlns:wsnt=\"" + wsnt + "\" FirstBroker=\"false\">" + + "<wsnt:Brokers xmlns:wsnt=\"" + wsnt + "\">" + + "<wsnt:Broker xmlns:wsnt=\"" + wsnt + "\">" + + "<wsnt:BrokerConsumerReference xmlns:wsnt=\"" + wsnt + "\">" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" + + "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" + + "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid1 + "</wsnt:BrokerID>" + + "</wsa:ReferenceProperties>" + + "</wsa:EndpointReference>" + + "</wsnt:BrokerConsumerReference>" + + "<wsnt:BrokerProducerReference xmlns:wsnt=\"" + wsnt + "\">" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" + + "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" + + "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid2 + "</wsnt:BrokerID>" + + "</wsa:ReferenceProperties>" + + "</wsa:EndpointReference>" + + "</wsnt:BrokerProducerReference>" + + "</wsnt:Broker>" + + "</wsnt:Brokers>" + + "</wsnt:NewBrokerResponse>"; + private static String testRemoveBroker = + "<wsnt:RemoveBroker xmlns:wsnt=\"" + wsnt + "\">" + + "<wsnt:BrokerConsumerReference xmlns:wsnt=\"" + wsnt + "\">" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl + "</wsa:Address>" + + "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" + + "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid1 + "</wsnt:BrokerID>" + + "</wsa:ReferenceProperties>" + + "</wsa:EndpointReference>" + + "</wsnt:BrokerConsumerReference>" + + "<wsnt:NeighborBrokerConsumers xmlns:wsnt=\"" + wsnt + "\" ConsumerSequenceType=\"BrokerConsumers\">" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" + + "</wsa:EndpointReference>" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" + + "</wsa:EndpointReference>" + + "</wsnt:NeighborBrokerConsumers>" + + "</wsnt:RemoveBroker>"; + private static String testReplaceBrokerConnection = + "<wsnt:ReplaceBrokerConnection xmlns:wsnt=\"" + wsnt + "\">" + + "<wsnt:RemovedBroker xmlns:wsnt=\"" + wsnt + "\">" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl + "</wsa:Address>" + + "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" + + "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid1 + "</wsnt:BrokerID>" + + "</wsa:ReferenceProperties>" + + "</wsa:EndpointReference>" + + "</wsnt:RemovedBroker>" + + "<wsnt:Neighbors xmlns:wsnt=\"" + wsnt + "\">" + + "<wsnt:Broker xmlns:wsnt=\"" + wsnt + "\">" + + "<wsnt:BrokerConsumerReference xmlns:wsnt=\"" + wsnt + "\">" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" + + "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" + + "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid1 + "</wsnt:BrokerID>" + + "</wsa:ReferenceProperties>" + + "</wsa:EndpointReference>" + + "</wsnt:BrokerConsumerReference>" + + "<wsnt:BrokerProducerReference xmlns:wsnt=\"" + wsnt + "\">" + + "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" + + "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" + + "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" + + "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid2 + "</wsnt:BrokerID>" + + "</wsa:ReferenceProperties>" + + "</wsa:EndpointReference>" + + "</wsnt:BrokerProducerReference>" + + "</wsnt:Broker>" + + "</wsnt:Neighbors>" + + "</wsnt:ReplaceBrokerConnection>"; + + public void testSubscribe() throws Exception { + DefaultEncodingRegistry der = new DefaultEncodingRegistry(); + SubscribeEnDeCoder sed = new SubscribeEnDeCoder(der); + sed.start(); + ConsumerReferenceEnDeCoder cred = new ConsumerReferenceEnDeCoder(der); + cred.start(); + EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der); + epred.start(); + EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der); + eaed.start(); + + XMLInputFactory xif = XMLInputFactory.newInstance(); + XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testSubscribe)); + reader.next(); + Subscribe subscribe = (Subscribe)der.decode(reader); + Assert.assertEquals(subscribe.getConsumerReference().getReference().getEndpointAddress().getAddress().toString(), testUrl); + + XMLOutputFactory xof = XMLOutputFactory.newInstance(); + StringWriter testWriter = new StringWriter(); + XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter); + der.encode(subscribe, writer); + writer.flush(); + String encoded = testWriter.toString(); + Assert.assertEquals(encoded, testSubscribe); + } + + public void testNewConsumerResponse() throws Exception { + DefaultEncodingRegistry der = new DefaultEncodingRegistry(); + NewConsumerResponseEnDeCoder ncred = new NewConsumerResponseEnDeCoder(der); + ncred.start(); + EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der); + epred.start(); + EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der); + eaed.start(); + + XMLInputFactory xif = XMLInputFactory.newInstance(); + XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testNewConsumerResponse)); + reader.next(); + NewConsumerResponse newConsumerResponse = (NewConsumerResponse)der.decode(reader); + Assert.assertEquals(newConsumerResponse.getSequenceType(), "EndProducers"); + Assert.assertEquals(newConsumerResponse.getReferenceSequence().iterator().next().getEndpointAddress().getAddress().toString(), + testUrl); + + XMLOutputFactory xof = XMLOutputFactory.newInstance(); + StringWriter testWriter = new StringWriter(); + XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter); + der.encode(newConsumerResponse, writer); + writer.flush(); + String encoded = testWriter.toString(); + Assert.assertEquals(encoded, testNewConsumerResponse); + } + + public void testNoProducersResponse() throws Exception { + DefaultEncodingRegistry der = new DefaultEncodingRegistry(); + NewConsumerResponseEnDeCoder ncred = new NewConsumerResponseEnDeCoder(der); + ncred.start(); + EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der); + epred.start(); + EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der); + eaed.start(); + + XMLInputFactory xif = XMLInputFactory.newInstance(); + XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testNoProducersResponse)); + reader.next(); + NewConsumerResponse newConsumerResponse = (NewConsumerResponse)der.decode(reader); + Assert.assertEquals(newConsumerResponse.getSequenceType(), "NoProducers"); + + XMLOutputFactory xof = XMLOutputFactory.newInstance(); + StringWriter testWriter = new StringWriter(); + XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter); + der.encode(newConsumerResponse, writer); + writer.flush(); + String encoded = testWriter.toString(); + Assert.assertEquals(encoded, testNoProducersResponse); + } + + public void testNewProducerResponse() throws Exception { + DefaultEncodingRegistry der = new DefaultEncodingRegistry(); + NewProducerResponseEnDeCoder npred = new NewProducerResponseEnDeCoder(der); + npred.start(); + EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der); + epred.start(); + EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der); + eaed.start(); + + XMLInputFactory xif = XMLInputFactory.newInstance(); + XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testNewProducerResponse)); + reader.next(); + NewProducerResponse newProducerResponse = (NewProducerResponse)der.decode(reader); + Assert.assertEquals(newProducerResponse.getSequenceType(), "EndConsumers"); + Iterator<EndpointReference> it = newProducerResponse.getReferenceSequence().iterator(); + it.next(); + Assert.assertEquals(it.next().getEndpointAddress().getAddress().toString(), testUrl2); + + XMLOutputFactory xof = XMLOutputFactory.newInstance(); + StringWriter testWriter = new StringWriter(); + XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter); + der.encode(newProducerResponse, writer); + writer.flush(); + String encoded = testWriter.toString(); + Assert.assertEquals(encoded, testNewProducerResponse); + } + + public void testNewBroker() throws Exception { + DefaultEncodingRegistry der = new DefaultEncodingRegistry(); + NewBrokerEnDeCoder nbed = new NewBrokerEnDeCoder(der); + nbed.start(); + BrokerConsumerReferenceEnDeCoder bcred = new BrokerConsumerReferenceEnDeCoder(der); + bcred.start(); + EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der); + epred.start(); + EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der); + eaed.start(); + ReferencePropertiesEnDeCoder rped = new ReferencePropertiesEnDeCoder(der); + rped.start(); + BrokerIDEnDeCoder bied = new BrokerIDEnDeCoder(der); + bied.start(); + BrokerProducerReferenceEnDeCoder bpred = new BrokerProducerReferenceEnDeCoder(der); + bpred.start(); + + XMLInputFactory xif = XMLInputFactory.newInstance(); + XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testNewBroker)); + reader.next(); + NewBroker newBroker = (NewBroker)der.decode(reader); + Assert.assertEquals(newBroker.getBrokerConsumerReference().getReference().getEndpointAddress().getAddress().toString(), + testUrl1); + Assert.assertEquals(newBroker.getBrokerProducerReference().getReference().getEndpointAddress().getAddress().toString(), + testUrl2); + + XMLOutputFactory xof = XMLOutputFactory.newInstance(); + StringWriter testWriter = new StringWriter(); + XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter); + der.encode(newBroker, writer); + writer.flush(); + String encoded = testWriter.toString(); + Assert.assertEquals(encoded, testNewBroker); + } + + public void testNewBrokerRespnse1() throws Exception { + DefaultEncodingRegistry der = new DefaultEncodingRegistry(); + NewBrokerResponseEnDeCoder nbred = new NewBrokerResponseEnDeCoder(der); + nbred.start(); + EndProducersEnDeCoder epred = new EndProducersEnDeCoder(der); + epred.start(); + EndConsumersEnDeCoder ecred = new EndConsumersEnDeCoder(der); + ecred.start(); + EndpointReferenceEnDeCoder ered = new EndpointReferenceEnDeCoder(der); + ered.start(); + EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der); + eaed.start(); + + XMLInputFactory xif = XMLInputFactory.newInstance(); + XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testNewBrokerResponse1)); + reader.next(); + NewBrokerResponse newBrokerResponse = (NewBrokerResponse)der.decode(reader); + Assert.assertFalse(!newBrokerResponse.isFirstBroker()); + Assert.assertEquals(newBrokerResponse.getEndProducers().getSequenceType(), "NoProducers"); + Assert.assertEquals(newBrokerResponse.getEndConsumers().getSequenceType(), "EndConsumers"); + Assert.assertEquals(newBrokerResponse.getEndConsumers().getReferenceSequence().get(0).getEndpointAddress().getAddress().toString(), + testUrl1); + + XMLOutputFactory xof = XMLOutputFactory.newInstance(); + StringWriter testWriter = new StringWriter(); + XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter); + der.encode(newBrokerResponse, writer); + writer.flush(); + String encoded = testWriter.toString(); + Assert.assertEquals(encoded, testNewBrokerResponse1); + } + + public void testNewBrokerRespnse2() throws Exception { + DefaultEncodingRegistry der = new DefaultEncodingRegistry(); + NewBrokerResponseEnDeCoder nbred = new NewBrokerResponseEnDeCoder(der); + nbred.start(); + BrokersEnDeCoder bsed = new BrokersEnDeCoder(der); + bsed.start(); + BrokerEnDeCoder bed = new BrokerEnDeCoder(der); + bed.start(); + BrokerConsumerReferenceEnDeCoder bcred = new BrokerConsumerReferenceEnDeCoder(der); + bcred.start(); + BrokerProducerReferenceEnDeCoder bpred = new BrokerProducerReferenceEnDeCoder(der); + bpred.start(); + EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der); + epred.start(); + EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der); + eaed.start(); + ReferencePropertiesEnDeCoder rped = new ReferencePropertiesEnDeCoder(der); + rped.start(); + BrokerIDEnDeCoder bied = new BrokerIDEnDeCoder(der); + bied.start(); + + XMLInputFactory xif = XMLInputFactory.newInstance(); + XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testNewBrokerResponse2)); + reader.next(); + NewBrokerResponse newBrokerResponse = (NewBrokerResponse)der.decode(reader); + Assert.assertFalse(newBrokerResponse.isFirstBroker()); + Assert.assertEquals(newBrokerResponse.getBrokers().getBrokerSequence().get(0) + .getBrokerConsumerReference().getReference().getEndpointAddress().getAddress().toString(), + testUrl1); + Assert.assertEquals(newBrokerResponse.getBrokers().getBrokerSequence().get(0) + .getBrokerProducerReference().getReference().getEndpointAddress().getAddress().toString(), + testUrl2); + + XMLOutputFactory xof = XMLOutputFactory.newInstance(); + StringWriter testWriter = new StringWriter(); + XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter); + der.encode(newBrokerResponse, writer); + writer.flush(); + String encoded = testWriter.toString(); + Assert.assertEquals(encoded, testNewBrokerResponse2); + } + + public void testRemoveBroker() throws Exception { + DefaultEncodingRegistry der = new DefaultEncodingRegistry(); + RemoveBrokerEnDeCoder rbed = new RemoveBrokerEnDeCoder(der); + rbed.start(); + BrokerConsumerReferenceEnDeCoder bcred = new BrokerConsumerReferenceEnDeCoder(der); + bcred.start(); + EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der); + epred.start(); + EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der); + eaed.start(); + ReferencePropertiesEnDeCoder rped = new ReferencePropertiesEnDeCoder(der); + rped.start(); + BrokerIDEnDeCoder bied = new BrokerIDEnDeCoder(der); + bied.start(); + NeighborBrokerConsumersEnDeCoder nbced = new NeighborBrokerConsumersEnDeCoder(der); + nbced.start(); + + XMLInputFactory xif = XMLInputFactory.newInstance(); + XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testRemoveBroker)); + reader.next(); + RemoveBroker removeBroker = (RemoveBroker)der.decode(reader); + Assert.assertEquals(removeBroker.getBrokerConsumerReference().getReference().getEndpointAddress().getAddress().toString(), + testUrl); + NeighborBrokerConsumers neighborBrokerConsumers = removeBroker.getNeighborBrokerConsumers(); + Assert.assertEquals(neighborBrokerConsumers.getSequenceType(), "BrokerConsumers"); + Iterator<EndpointReference> it = neighborBrokerConsumers.getReferenceSequence().iterator(); + it.next(); + Assert.assertEquals(it.next().getEndpointAddress().getAddress().toString(), testUrl2); + + XMLOutputFactory xof = XMLOutputFactory.newInstance(); + StringWriter testWriter = new StringWriter(); + XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter); + der.encode(removeBroker, writer); + writer.flush(); + String encoded = testWriter.toString(); + Assert.assertEquals(encoded, testRemoveBroker); + } + + public void testReplaceBrokerConnection() throws Exception { + DefaultEncodingRegistry der = new DefaultEncodingRegistry(); + ReplaceBrokerConnectionEnDeCoder rbced = new ReplaceBrokerConnectionEnDeCoder(der); + rbced.start(); + RemovedBrokerEnDeCoder rbed = new RemovedBrokerEnDeCoder(der); + rbed.start(); + EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der); + epred.start(); + EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der); + eaed.start(); + ReferencePropertiesEnDeCoder rped = new ReferencePropertiesEnDeCoder(der); + rped.start(); + BrokerIDEnDeCoder bied = new BrokerIDEnDeCoder(der); + bied.start(); + BrokerEnDeCoder bed = new BrokerEnDeCoder(der); + bed.start(); + BrokerConsumerReferenceEnDeCoder bcred = new BrokerConsumerReferenceEnDeCoder(der); + bcred.start(); + BrokerProducerReferenceEnDeCoder bpred = new BrokerProducerReferenceEnDeCoder(der); + bpred.start(); + NeighborsEnDeCoder nced = new NeighborsEnDeCoder(der); + nced.start(); + + XMLInputFactory xif = XMLInputFactory.newInstance(); + XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testReplaceBrokerConnection)); + reader.next(); + ReplaceBrokerConnection replaceBrokerConnection = (ReplaceBrokerConnection)der.decode(reader); + Assert.assertEquals(replaceBrokerConnection.getRemovedBroker().getReference().getEndpointAddress().getAddress().toString(), + testUrl); + Neighbors neighbors = replaceBrokerConnection.getNeighbors(); + Assert.assertEquals(neighbors.getBrokerSequence().get(0) + .getBrokerConsumerReference().getReference().getEndpointAddress().getAddress().toString(), + testUrl1); + Assert.assertEquals(neighbors.getBrokerSequence().get(0) + .getBrokerProducerReference().getReference().getEndpointAddress().getAddress().toString(), + testUrl2); + + XMLOutputFactory xof = XMLOutputFactory.newInstance(); + StringWriter testWriter = new StringWriter(); + XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter); + der.encode(replaceBrokerConnection, writer); + writer.flush(); + String encoded = testWriter.toString(); + Assert.assertEquals(encoded, testReplaceBrokerConnection); + } +} |