summaryrefslogtreecommitdiffstats
path: root/branches/sca-java-1.2.1/modules/binding-notification/src
diff options
context:
space:
mode:
authordims <dims@13f79535-47bb-0310-9956-ffa450edef68>2008-06-17 00:23:01 +0000
committerdims <dims@13f79535-47bb-0310-9956-ffa450edef68>2008-06-17 00:23:01 +0000
commitbdd0a41aed7edf21ec2a65cfa17a86af2ef8c48a (patch)
tree38a92061c0793434c4be189f1d70c3458b6bc41d /branches/sca-java-1.2.1/modules/binding-notification/src
Move Tuscany from Incubator to top level.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@668359 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'branches/sca-java-1.2.1/modules/binding-notification/src')
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/DefaultNotificationBindingFactory.java31
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBinding.java37
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingFactory.java27
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingImpl.java75
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingModuleActivator.java68
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProcessor.java97
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProviderFactory.java334
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBrokerManager.java40
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java131
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java344
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java321
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManager.java43
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java692
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBroker.java44
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBrokerEnDeCoder.java88
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractEnDeCoder.java50
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Broker.java25
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReference.java26
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReferenceEnDeCoder.java44
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerEnDeCoder.java46
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerID.java41
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerIDEnDeCoder.java91
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReference.java26
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReferenceEnDeCoder.java44
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Brokers.java41
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokersEnDeCoder.java89
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverride.java35
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideEnDeCoder.java83
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponse.java25
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponseEnDeCoder.java77
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Constants.java74
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReference.java26
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReferenceEnDeCoder.java44
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/DefaultEncodingRegistry.java78
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EnDeCoder.java62
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingException.java49
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingObject.java26
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingRegistry.java57
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingUtils.java85
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumers.java26
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumersEnDeCoder.java61
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducers.java26
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducersEnDeCoder.java61
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddress.java45
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddressEnDeCoder.java91
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReference.java44
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceEnDeCoder.java101
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequence.java54
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequenceEnDeCoder.java85
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapper.java35
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapperEnDeCoder.java74
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumers.java26
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumersEnDeCoder.java61
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Neighbors.java45
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborsEnDeCoder.java89
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBroker.java25
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAck.java25
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAckEnDeCoder.java77
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerEnDeCoder.java46
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponse.java62
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponseEnDeCoder.java119
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumer.java26
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerEnDeCoder.java44
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponse.java26
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponseEnDeCoder.java61
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducer.java26
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerEnDeCoder.java44
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponse.java26
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponseEnDeCoder.java61
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferenceProperties.java54
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferencePropertiesEnDeCoder.java87
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBroker.java44
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBrokerEnDeCoder.java101
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBroker.java26
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBrokerEnDeCoder.java44
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnection.java44
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnectionEnDeCoder.java101
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Subscribe.java35
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/SubscribeEnDeCoder.java83
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/IOUtils.java180
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/NotificationServlet.java91
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/URIUtil.java42
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/main/resources/META-INF/services/org.apache.tuscany.sca.core.ModuleActivator18
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/AxiomTestCase.java57
-rw-r--r--branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingTestCase.java470
85 files changed, 6685 insertions, 0 deletions
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/DefaultNotificationBindingFactory.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/DefaultNotificationBindingFactory.java
new file mode 100644
index 0000000000..e320357d93
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/DefaultNotificationBindingFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+/**
+ * Creates runtime artifacts for the notification binding
+ *
+ * @version $Rev$ $Date$
+ */
+public class DefaultNotificationBindingFactory implements NotificationBindingFactory {
+
+ public NotificationBinding createNotificationBinding() {
+ return new NotificationBindingImpl();
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBinding.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBinding.java
new file mode 100644
index 0000000000..f09798859b
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBinding.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+import java.net.URI;
+
+import org.apache.tuscany.sca.assembly.Binding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public interface NotificationBinding extends Binding {
+
+ String getNtmAddress();
+
+ void setNtmAddress(String ntm);
+
+ URI getNotificationType();
+
+ void setNotificationType(URI notificationType);
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingFactory.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingFactory.java
new file mode 100644
index 0000000000..2749f5f958
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public interface NotificationBindingFactory {
+
+ NotificationBinding createNotificationBinding();
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingImpl.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingImpl.java
new file mode 100644
index 0000000000..992cbb6c2d
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+import java.net.URI;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NotificationBindingImpl implements NotificationBinding {
+ private String name;
+ private String uri;
+ protected String ntmAddress;
+ protected URI notificationType;
+
+ @Override
+ public Object clone() {
+ return this;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getURI() {
+ return uri;
+ }
+
+ public void setURI(String uri) {
+ this.uri = uri;
+ }
+
+ public String getNtmAddress() {
+ return ntmAddress;
+ }
+
+ public void setNtmAddress(String ntmAddress) {
+ this.ntmAddress = ntmAddress;
+ }
+
+ public URI getNotificationType() {
+ return notificationType;
+ }
+
+ public void setNotificationType(URI notificationType) {
+ this.notificationType = notificationType;
+ }
+
+ public void setUnresolved(boolean unresolved) {
+ }
+
+ public boolean isUnresolved() {
+ return false;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingModuleActivator.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingModuleActivator.java
new file mode 100644
index 0000000000..2cefaa8bba
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingModuleActivator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+import org.apache.tuscany.sca.assembly.AssemblyFactory;
+import org.apache.tuscany.sca.assembly.DefaultAssemblyFactory;
+import org.apache.tuscany.sca.binding.notification.encoding.DefaultEncodingRegistry;
+import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.ModuleActivator;
+import org.apache.tuscany.sca.host.http.ExtensibleServletHost;
+import org.apache.tuscany.sca.host.http.ServletHost;
+import org.apache.tuscany.sca.host.http.ServletHostExtensionPoint;
+import org.apache.tuscany.sca.policy.DefaultPolicyFactory;
+import org.apache.tuscany.sca.policy.PolicyFactory;
+import org.apache.tuscany.sca.provider.ProviderFactoryExtensionPoint;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NotificationBindingModuleActivator implements ModuleActivator {
+
+ private NotificationBindingProcessor bindingProcessor;
+
+ private DefaultEncodingRegistry encodingRegistry;
+ private ServletHost servletHost;
+
+
+ public void start(ExtensionPointRegistry registry) {
+ encodingRegistry = new DefaultEncodingRegistry();
+ servletHost = new ExtensibleServletHost(registry.getExtensionPoint(ServletHostExtensionPoint.class));
+
+ AssemblyFactory assemblyFactory = new DefaultAssemblyFactory();
+ PolicyFactory policyFactory = new DefaultPolicyFactory();
+ DefaultNotificationBindingFactory bindingFactory = new DefaultNotificationBindingFactory();
+ bindingProcessor = new NotificationBindingProcessor(assemblyFactory, policyFactory, bindingFactory);
+ StAXArtifactProcessorExtensionPoint processors = registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class);
+ processors.addArtifactProcessor(bindingProcessor);
+
+ NotificationBindingProviderFactory nbpf = new NotificationBindingProviderFactory(servletHost,
+ encodingRegistry);
+ ProviderFactoryExtensionPoint providerFactories = registry.getExtensionPoint(ProviderFactoryExtensionPoint.class);
+ providerFactories.addProviderFactory(nbpf);
+ }
+
+ public void stop(ExtensionPointRegistry registry) {
+ encodingRegistry.stop();
+ StAXArtifactProcessorExtensionPoint processors = registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class);
+ processors.removeArtifactProcessor(bindingProcessor);
+ }
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProcessor.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProcessor.java
new file mode 100644
index 0000000000..0c9e1df1bc
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProcessor.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+import org.apache.tuscany.sca.assembly.AssemblyFactory;
+import org.apache.tuscany.sca.assembly.xml.Constants;
+import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor;
+import org.apache.tuscany.sca.contribution.resolver.ModelResolver;
+import org.apache.tuscany.sca.contribution.service.ContributionReadException;
+import org.apache.tuscany.sca.contribution.service.ContributionResolveException;
+import org.apache.tuscany.sca.contribution.service.ContributionWriteException;
+import org.apache.tuscany.sca.policy.PolicyFactory;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NotificationBindingProcessor implements StAXArtifactProcessor<NotificationBinding> {
+
+ protected static final QName BINDING_NOTIFICATION = new QName(Constants.SCA10_TUSCANY_NS, "binding.notification");
+
+ private NotificationBindingFactory bindingFactory;
+
+ public NotificationBindingProcessor(AssemblyFactory assemblyFactory,
+ PolicyFactory policyFactory,
+ NotificationBindingFactory bindingFactory) {
+ this.bindingFactory = bindingFactory;
+ }
+
+ public QName getArtifactType() {
+ return BINDING_NOTIFICATION;
+ }
+
+ public Class<NotificationBinding> getModelType() {
+ return NotificationBinding.class;
+ }
+
+ public NotificationBinding read(XMLStreamReader reader) throws ContributionReadException, XMLStreamException {
+ assert BINDING_NOTIFICATION.equals(reader.getName());
+ String bindingUri = reader.getAttributeValue(null, "uri");
+ String name = reader.getAttributeValue(null, "name");
+ String ntm = reader.getAttributeValue(null, "ntm");
+ String notificationType = reader.getAttributeValue(null, "notificationType");
+
+ NotificationBinding binding = bindingFactory.createNotificationBinding();
+ if (name != null) {
+ binding.setName(name);
+ }
+ if (bindingUri != null) {
+ binding.setURI(bindingUri);
+ }
+ if (ntm != null) {
+ binding.setNtmAddress(ntm);
+ }
+ if (notificationType != null) {
+ try {
+ binding.setNotificationType(new URI(notificationType));
+ } catch(URISyntaxException e) {
+ throw new ContributionReadException(e);
+ }
+ }
+ return binding;
+ }
+
+ public void write(NotificationBinding notificationBinding, XMLStreamWriter writer)
+ throws ContributionWriteException, XMLStreamException {
+
+ //FIXME Implement this method
+ }
+
+ public void resolve(NotificationBinding notificationBinding, ModelResolver resolver) throws ContributionResolveException {
+ }
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProviderFactory.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProviderFactory.java
new file mode 100644
index 0000000000..7592f4f824
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBindingProviderFactory.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tuscany.sca.binding.notification.encoding.AbstractEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerConsumerReferenceEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerID;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerIDEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerProducerReferenceEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokersEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverrideEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverrideResponseEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.ConsumerReferenceEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry;
+import org.apache.tuscany.sca.binding.notification.encoding.EndConsumersEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.EndProducersEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.EndpointAddressEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference;
+import org.apache.tuscany.sca.binding.notification.encoding.EndpointReferenceEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.NeighborBrokerConsumersEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.NeighborsEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerAckEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerResponseEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.NewConsumerEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.NewConsumerResponseEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.NewProducerEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.NewProducerResponseEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.ReferencePropertiesEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.RemoveBrokerEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.RemovedBrokerEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.ReplaceBrokerConnectionEnDeCoder;
+import org.apache.tuscany.sca.binding.notification.encoding.SubscribeEnDeCoder;
+import org.apache.tuscany.sca.host.http.ServletHost;
+import org.apache.tuscany.sca.provider.BindingProviderFactory;
+import org.apache.tuscany.sca.provider.ReferenceBindingProvider;
+import org.apache.tuscany.sca.provider.ServiceBindingProvider;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.apache.tuscany.sca.runtime.RuntimeComponentReference;
+import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+
+/**
+ * The runtime representation of the notification reference binding
+ *
+ * @version $Rev$ $Date$
+ */
+public class NotificationBindingProviderFactory implements BindingProviderFactory<NotificationBinding>,
+ NotificationBrokerManager {
+ private static final String DEFAULT_PORT = "8083";
+
+ private ServletHost servletHost;
+ private NotificationTypeManagerImpl notificationTypeManager;
+ private EncodingRegistry encodingRegistry;
+ private String httpUrl;
+ private Map<URI, NotificationReferenceBindingProvider> referenceBindingProviders;
+ private Map<URI, NotificationServiceBindingProvider> serviceBindingProviders;
+
+ private static NotificationBindingProviderFactory factoryInstance = null;
+
+ public NotificationBindingProviderFactory(ServletHost servletHost, EncodingRegistry encodingRegistry) {
+ this.servletHost = servletHost;
+ this.encodingRegistry = encodingRegistry;
+ this.referenceBindingProviders = new HashMap<URI, NotificationReferenceBindingProvider>();
+ this.serviceBindingProviders = new HashMap<URI, NotificationServiceBindingProvider>();
+
+ factoryInstance = this;
+ }
+
+ public Class<NotificationBinding> getModelType() {
+ return NotificationBinding.class;
+ }
+
+ public ReferenceBindingProvider createReferenceBindingProvider(RuntimeComponent component,
+ RuntimeComponentReference reference,
+ NotificationBinding binding) {
+ init();
+ URI notificationType = binding.getNotificationType();
+ if (!validReferenceBinding(binding)) {
+ throw new RuntimeException("Binding not valid");
+ }
+ NotificationReferenceBindingProvider referenceBindingProvider = referenceBindingProviders.get(notificationType);
+ if (referenceBindingProvider == null) {
+ referenceBindingProvider =
+ new NotificationReferenceBindingProvider(binding, component, reference, servletHost,
+ notificationTypeManager, encodingRegistry, httpUrl, this);
+ referenceBindingProviders.put(notificationType, referenceBindingProvider);
+ }
+ return referenceBindingProvider;
+ }
+
+ public ServiceBindingProvider createServiceBindingProvider(RuntimeComponent component,
+ RuntimeComponentService service,
+ NotificationBinding binding) {
+ init();
+ URI notificationType = binding.getNotificationType();
+ if (!validServiceBinding(binding)) {
+ throw new RuntimeException("Binding not valid");
+ }
+ NotificationServiceBindingProvider serviceBindingProvider = serviceBindingProviders.get(notificationType);
+ if (serviceBindingProvider == null) {
+ serviceBindingProvider =
+ new NotificationServiceBindingProvider(binding, component, service, servletHost,
+ notificationTypeManager, encodingRegistry, httpUrl, this);
+ serviceBindingProviders.put(notificationType, serviceBindingProvider);
+ }
+ return serviceBindingProvider;
+ }
+
+ private boolean validServiceBinding(NotificationBinding binding) {
+ URI notificationType = binding.getNotificationType();
+ NotificationReferenceBindingProvider referenceBindingProvider = referenceBindingProviders.get(notificationType);
+ if (referenceBindingProvider != null) {
+ return validBinding(binding, referenceBindingProvider.getBinding());
+ }
+ return true;
+ }
+
+ private boolean validReferenceBinding(NotificationBinding binding) {
+ URI notificationType = binding.getNotificationType();
+ NotificationServiceBindingProvider serviceBindingProvider = serviceBindingProviders.get(notificationType);
+ if (serviceBindingProvider != null) {
+ return validBinding(binding, serviceBindingProvider.getBinding());
+ }
+ return true;
+ }
+
+ private boolean validBinding(NotificationBinding binding1, NotificationBinding binding2) {
+ String binding1NtmAddress = binding1.getNtmAddress();
+ String binding2NtmAddress = binding2.getNtmAddress();
+ if (binding1NtmAddress == null && binding2NtmAddress == null) {
+ return true;
+ } else if (binding1NtmAddress == null || binding2NtmAddress == null) {
+ return false;
+ } else {
+ return binding1NtmAddress.equals(binding2NtmAddress);
+ }
+ }
+
+ /*
+ * These methods are intended to be called by the binding providers' start
+ * methods. By the time this happens, both referenceBindingProvider != null &&
+ * serviceBindingProvider != null, if they are ever going to be
+ */
+ public void serviceProviderStarted(URI notificationType,
+ NotificationServiceBindingProvider serviceBindingProvider,
+ URL remoteNtmUrl) {
+ NotificationReferenceBindingProvider referenceBindingProvider = referenceBindingProviders.get(notificationType);
+ if (referenceBindingProvider == null) {
+ serviceBindingProvider.deployConsumer();
+ } else if (referenceBindingProvider.isStarted()) {
+ String brokerID = BrokerID.generate();
+ deployBroker(notificationType, serviceBindingProvider, referenceBindingProvider, brokerID, remoteNtmUrl);
+ }
+ }
+
+ public void referenceProviderStarted(URI notificationType,
+ NotificationReferenceBindingProvider referenceBindingProvider,
+ URL remoteNtmUrl) {
+ NotificationServiceBindingProvider serviceBindingProvider = serviceBindingProviders.get(notificationType);
+ if (serviceBindingProvider == null) {
+ referenceBindingProvider.deployProducer();
+ } else if (serviceBindingProvider.isStarted()) {
+ String brokerID = BrokerID.generate();
+ deployBroker(notificationType, serviceBindingProvider, referenceBindingProvider, brokerID, remoteNtmUrl);
+ }
+ }
+
+ private void deployBroker(URI notificationType,
+ NotificationServiceBindingProvider serviceBindingProvider,
+ NotificationReferenceBindingProvider referenceBindingProvider,
+ String brokerID,
+ URL remoteNtmUrl) {
+ URL consumerUrl = serviceBindingProvider.getURL();
+ URL producerUrl = referenceBindingProvider.getURL();
+ List<EndpointReference> consumerList = new ArrayList<EndpointReference>();
+ List<EndpointReference> producerList = new ArrayList<EndpointReference>();
+ boolean firstBroker =
+ notificationTypeManager.newBroker(notificationType,
+ consumerUrl,
+ producerUrl,
+ brokerID,
+ remoteNtmUrl,
+ consumerList,
+ producerList);
+ if (firstBroker) {
+ serviceBindingProvider.deployBroker(brokerID, null, producerList);
+ referenceBindingProvider.deployBroker(brokerID, null, consumerList);
+ if (!consumerList.isEmpty() || !producerList.isEmpty()) {
+ notificationTypeManager.newBrokerAck(remoteNtmUrl);
+ }
+ } else {
+ // returned lists contain broker consumers and producers and are the
+ // same length
+ int index = consumerList.size() - 1;
+ // establish connection with picked broker
+ EndpointReference brokerConsumerEPR = consumerList.get(index);
+ EndpointReference brokerProducerEPR = producerList.get(index);
+ serviceBindingProvider.deployBroker(brokerID, brokerProducerEPR, null);
+ referenceBindingProvider.deployBroker(brokerID, brokerConsumerEPR, null);
+ }
+ }
+
+ public void replaceConsumersBrokerConnection(URI notificationType, EndpointReference chosenBrokerProducerEpr) {
+ NotificationServiceBindingProvider serviceBindingProvider = serviceBindingProviders.get(notificationType);
+ if (serviceBindingProvider == null) {
+ throw new RuntimeException("Missing service binding provider for [" + notificationType + "]");
+ }
+ serviceBindingProvider.replaceBrokerConnection(chosenBrokerProducerEpr);
+ }
+
+ public static void removeBroker(URI notificationType) {
+ if (factoryInstance == null) {
+ throw new RuntimeException("Missing factory instance");
+ }
+ NotificationReferenceBindingProvider referenceBindingProvider =
+ factoryInstance.referenceBindingProviders.get(notificationType);
+ NotificationServiceBindingProvider serviceBindingProvider =
+ factoryInstance.serviceBindingProviders.get(notificationType);
+ if (referenceBindingProvider == null || serviceBindingProvider == null) {
+ throw new RuntimeException("Not a broker for [" + notificationType + "]");
+ }
+ referenceBindingProvider.undeployBroker(serviceBindingProvider.getURL());
+ }
+
+ private String getBaseURI() {
+ if (httpUrl == null) {
+ String httpPort = System.getProperty("notification.httpPort");
+ if (httpPort == null) {
+ httpPort = DEFAULT_PORT;
+ }
+ String localHost = null;
+ try {
+ localHost = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (Exception e) {
+ e.printStackTrace();
+ localHost = "localhost";
+ }
+ httpUrl = "http://" + localHost + ((httpPort != null) ? (":" + httpPort) : "");
+ }
+ return httpUrl;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void init() {
+ if (notificationTypeManager == null) {
+ AbstractEnDeCoder subscribeEnDeCoder = new SubscribeEnDeCoder(encodingRegistry);
+ subscribeEnDeCoder.start();
+ AbstractEnDeCoder consumerReferenceEnDeCoder = new ConsumerReferenceEnDeCoder(encodingRegistry);
+ consumerReferenceEnDeCoder.start();
+ AbstractEnDeCoder endpointAddressEnDeCoder = new EndpointAddressEnDeCoder(encodingRegistry);
+ endpointAddressEnDeCoder.start();
+ AbstractEnDeCoder newConsumerEnDeCoder = new NewConsumerEnDeCoder(encodingRegistry);
+ newConsumerEnDeCoder.start();
+ AbstractEnDeCoder newProducerEnDeCoder = new NewProducerEnDeCoder(encodingRegistry);
+ newProducerEnDeCoder.start();
+ AbstractEnDeCoder newConsumerResponseEnDeCoder = new NewConsumerResponseEnDeCoder(encodingRegistry);
+ newConsumerResponseEnDeCoder.start();
+ AbstractEnDeCoder newProducerResponseEnDeCoder = new NewProducerResponseEnDeCoder(encodingRegistry);
+ newProducerResponseEnDeCoder.start();
+ AbstractEnDeCoder newBrokerEnDeCoder = new NewBrokerEnDeCoder(encodingRegistry);
+ newBrokerEnDeCoder.start();
+ AbstractEnDeCoder brokerConsumerReferenceEnDeCoder = new BrokerConsumerReferenceEnDeCoder(encodingRegistry);
+ brokerConsumerReferenceEnDeCoder.start();
+ AbstractEnDeCoder brokerProducerReferenceEnDeCoder = new BrokerProducerReferenceEnDeCoder(encodingRegistry);
+ brokerProducerReferenceEnDeCoder.start();
+ AbstractEnDeCoder newBrokerResponseEnDeCoder = new NewBrokerResponseEnDeCoder(encodingRegistry);
+ newBrokerResponseEnDeCoder.start();
+ AbstractEnDeCoder brokersEnDeCoder = new BrokersEnDeCoder(encodingRegistry);
+ brokersEnDeCoder.start();
+ AbstractEnDeCoder brokerEnDeCoder = new BrokerEnDeCoder(encodingRegistry);
+ brokerEnDeCoder.start();
+ AbstractEnDeCoder endConsumersEnDeCoder = new EndConsumersEnDeCoder(encodingRegistry);
+ endConsumersEnDeCoder.start();
+ AbstractEnDeCoder endProducersEnDeCoder = new EndProducersEnDeCoder(encodingRegistry);
+ endProducersEnDeCoder.start();
+ AbstractEnDeCoder endpointReferenceEnDeCoder = new EndpointReferenceEnDeCoder(encodingRegistry);
+ endpointReferenceEnDeCoder.start();
+ AbstractEnDeCoder referencePropertiesEnDeCoder = new ReferencePropertiesEnDeCoder(encodingRegistry);
+ referencePropertiesEnDeCoder.start();
+ AbstractEnDeCoder brokerIDEnDeCoder = new BrokerIDEnDeCoder(encodingRegistry);
+ brokerIDEnDeCoder.start();
+ AbstractEnDeCoder connectionOverrideEnDeCoder = new ConnectionOverrideEnDeCoder(encodingRegistry);
+ connectionOverrideEnDeCoder.start();
+ AbstractEnDeCoder connectionOverrideResponseEnDeCoder =
+ new ConnectionOverrideResponseEnDeCoder(encodingRegistry);
+ connectionOverrideResponseEnDeCoder.start();
+ AbstractEnDeCoder newBrokerAckEnDeCoder = new NewBrokerAckEnDeCoder(encodingRegistry);
+ newBrokerAckEnDeCoder.start();
+ AbstractEnDeCoder neighborBrokerConsumersEnDeCoder = new NeighborBrokerConsumersEnDeCoder(encodingRegistry);
+ neighborBrokerConsumersEnDeCoder.start();
+ AbstractEnDeCoder removeBrokerEnDeCoder = new RemoveBrokerEnDeCoder(encodingRegistry);
+ removeBrokerEnDeCoder.start();
+ AbstractEnDeCoder removedBrokerEnDeCoder = new RemovedBrokerEnDeCoder(encodingRegistry);
+ removedBrokerEnDeCoder.start();
+ AbstractEnDeCoder neighborsEnDeCoder = new NeighborsEnDeCoder(encodingRegistry);
+ neighborsEnDeCoder.start();
+ AbstractEnDeCoder replaceBrokerConnectionEnDeCoder = new ReplaceBrokerConnectionEnDeCoder(encodingRegistry);
+ replaceBrokerConnectionEnDeCoder.start();
+
+ notificationTypeManager = new NotificationTypeManagerImpl();
+ notificationTypeManager.setServletHost(servletHost);
+ notificationTypeManager.setEncodingRegistry(encodingRegistry);
+ notificationTypeManager.init();
+
+ getBaseURI();
+ }
+ }
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBrokerManager.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBrokerManager.java
new file mode 100644
index 0000000000..f088025a23
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationBrokerManager.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+import java.net.URI;
+import java.net.URL;
+
+import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public interface NotificationBrokerManager {
+
+ void serviceProviderStarted(URI notificationType,
+ NotificationServiceBindingProvider serviceBindingProvider,
+ URL remoteNtmUrl);
+
+ void referenceProviderStarted(URI notificationType,
+ NotificationReferenceBindingProvider referenceBindingProvider,
+ URL remoteNtmUrl);
+
+ void replaceConsumersBrokerConnection(URI notificationType, EndpointReference chosenBrokerProducerEpr);
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java
new file mode 100644
index 0000000000..b594fde29f
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingInvoker.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+import java.io.OutputStream;
+import java.util.HashMap;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.tuscany.sca.binding.notification.NotificationReferenceBindingProvider.SubscriberInfo;
+import org.apache.tuscany.sca.binding.notification.encoding.Constants;
+import org.apache.tuscany.sca.binding.notification.util.IOUtils;
+import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException;
+import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable;
+import org.apache.tuscany.sca.implementation.notification.ImmutableMessage;
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.Message;
+
+/**
+ * Turns invoke into remote message fan-out
+ *
+ * @version $Rev$ $Date$
+ */
+public class NotificationReferenceBindingInvoker implements Invoker {
+
+ private static final Message RESPONSE = new ImmutableMessage();
+ private Operation operation;
+
+ private NotificationReferenceBindingProvider notificationReferenceBindingProvider;
+
+ public NotificationReferenceBindingInvoker(Operation operation,
+ NotificationReferenceBindingProvider notificationReferenceBindingProvider) {
+ this.operation = operation;
+ this.notificationReferenceBindingProvider = notificationReferenceBindingProvider;
+ }
+
+ public Message invoke(Message msg) {
+ Object payload = msg.getBody();
+ if (payload == null) {
+ throw new RuntimeException("Message body is null");
+ }
+ Writeable writeable = null;
+ String incomingBrokerID = null;
+ if (payload.getClass().isArray()) {
+ Object[] bodyArray = (Object[])payload;
+ if (bodyArray.length == 3) {
+ writeable = getWriteableFromByteArray((byte[])bodyArray[1]);
+ incomingBrokerID = (String)bodyArray[2];
+ }
+ else if (bodyArray.length == 1) {
+ writeable = getWriteableFromPayload(bodyArray[0]);
+ }
+ else {
+ throw new RuntimeException("Invalid body array size");
+ }
+ }
+ else {
+ writeable = getWriteableFromPayload(payload);
+ }
+
+ try {
+ for (SubscriberInfo subscriber : notificationReferenceBindingProvider.getSubscribers()) {
+ // check for each subscriber's broker id and skip if equal
+ if (incomingBrokerID != null && subscriber.brokerID != null && incomingBrokerID.equals(subscriber.brokerID)) {
+ continue;
+ }
+ HashMap<String, String> headers = new HashMap<String, String>();
+ headers.put(IOUtils.Notification_Operation, operation.getName());
+ String brokerID = notificationReferenceBindingProvider.getBrokerID();
+ if (brokerID != null) {
+ headers.put(Constants.Broker_ID, brokerID);
+ }
+ IOUtils.sendHttpRequest(subscriber.address, headers, writeable, null);
+ }
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("Sender caught exception", e);
+ }
+ return RESPONSE;
+ }
+
+ private Writeable getWriteableFromPayload(Object payload) throws RuntimeException {
+ if (!(payload instanceof OMElement)) {
+ throw new RuntimeException("payload not OMElement");
+ }
+ final OMElement element = (OMElement)payload;
+ Writeable writeable = new Writeable() {
+ public void write(OutputStream os) throws IOUtilsException {
+ try {
+ element.serialize(os);
+ os.flush();
+ }
+ catch(Exception e) {
+ throw new IOUtilsException(e);
+ }
+ }
+ };
+ return writeable;
+ }
+
+ private Writeable getWriteableFromByteArray(final byte[] payload) {
+ Writeable writeable = new Writeable() {
+ public void write(OutputStream os) throws IOUtilsException {
+ try {
+ os.write(payload);
+ os.flush();
+ }
+ catch(Exception e) {
+ throw new IOUtilsException(e);
+ }
+ }
+ };
+ return writeable;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java
new file mode 100644
index 0000000000..59ab37b2eb
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationReferenceBindingProvider.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.tuscany.sca.binding.notification.encoding.Broker;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerID;
+import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverride;
+import org.apache.tuscany.sca.binding.notification.encoding.Constants;
+import org.apache.tuscany.sca.binding.notification.encoding.EncodingObject;
+import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry;
+import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils;
+import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference;
+import org.apache.tuscany.sca.binding.notification.encoding.ReplaceBrokerConnection;
+import org.apache.tuscany.sca.binding.notification.encoding.Subscribe;
+import org.apache.tuscany.sca.binding.notification.util.NotificationServlet;
+import org.apache.tuscany.sca.binding.notification.util.URIUtil;
+import org.apache.tuscany.sca.binding.notification.util.NotificationServlet.NotificationServletStreamHandler;
+import org.apache.tuscany.sca.host.http.ServletHost;
+import org.apache.tuscany.sca.interfacedef.Interface;
+import org.apache.tuscany.sca.interfacedef.InterfaceContract;
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.provider.ReferenceBindingProvider;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.apache.tuscany.sca.runtime.RuntimeComponentReference;
+
+/**
+ * The runtime representaion of the notification reference binding
+ *
+ * @version $Rev$ $Date$
+ */
+public class NotificationReferenceBindingProvider
+ implements ReferenceBindingProvider, NotificationServletStreamHandler {
+
+ private static final String producerPathBase = "/producer";
+ private NotificationReferenceBindingInvoker invoker;
+ private RuntimeComponentReference reference;
+ private NotificationBinding notificationBinding;
+ private ServletHost servletHost;
+ private NotificationTypeManager ntm;
+ private EncodingRegistry encodingRegistry;
+ private URI notificationType;
+ private URL myUrl;
+ private URL remoteNtmUrl;
+ private boolean started;
+ private NotificationBrokerManager brokerManager;
+
+ private List<SubscriberInfo> subscribers;
+ private String brokerID;
+
+ public NotificationReferenceBindingProvider(NotificationBinding notificationBinding,
+ RuntimeComponent component,
+ RuntimeComponentReference reference,
+ ServletHost servletHost,
+ NotificationTypeManager ntm,
+ EncodingRegistry encodingRegistry,
+ String httpUrl,
+ NotificationBrokerManager brokerManager) {
+ this.invoker = null;
+ this.notificationBinding = notificationBinding;
+ this.reference = reference;
+ this.servletHost = servletHost;
+ this.ntm = ntm;
+ this.encodingRegistry = encodingRegistry;
+ this.notificationType = notificationBinding.getNotificationType();
+ String ntmAddress = notificationBinding.getNtmAddress();
+ String notificationTypePath = URIUtil.getPath(notificationType);
+ try {
+ this.myUrl = new URL(httpUrl + producerPathBase + notificationTypePath);
+ this.remoteNtmUrl = null;
+ if (ntmAddress != null && notificationType != null) {
+ remoteNtmUrl = new URL(ntmAddress + notificationTypePath);
+ }
+ }
+ catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ this.started = false;
+ this.brokerManager = brokerManager;
+
+ URI uri = URI.create(component.getURI() + "/" + notificationBinding.getName());
+ notificationBinding.setURI(uri.toString());
+ Interface interfaze = reference.getInterfaceContract().getInterface();
+ interfaze.resetDataBinding(OMElement.class.getName());
+ for (Operation operation : interfaze.getOperations()) {
+ operation.setNonBlocking(false);
+ }
+
+ this.subscribers = new ArrayList<SubscriberInfo>();
+ this.brokerID = null;
+ }
+
+ public NotificationBinding getBinding() {
+ return notificationBinding;
+ }
+
+ public URL getURL() {
+ return myUrl;
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ public void setBrokerID(String brokerID) {
+ this.brokerID = brokerID;
+ }
+
+ public String getBrokerID() {
+ return brokerID;
+ }
+
+ public Invoker createInvoker(Operation operation) {
+ if (invoker == null) {
+ invoker = new NotificationReferenceBindingInvoker(operation, this);
+ }
+ return invoker;
+ }
+
+ public boolean supportsOneWayInvocation() {
+ return false;
+ }
+
+ public InterfaceContract getBindingInterfaceContract() {
+ return reference.getInterfaceContract();
+ }
+
+ public void start() {
+ if (started) {
+ return;
+ }
+
+ brokerManager.referenceProviderStarted(notificationType, this, remoteNtmUrl);
+ started = true;
+ }
+
+ public void stop() {
+ }
+
+ public void deployProducer() {
+ List<URL> consumerList = new ArrayList<URL>();
+ String sequenceType;
+ try {
+ sequenceType = ntm.newProducer(notificationType, myUrl, remoteNtmUrl, consumerList);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (Constants.EndConsumers.equals(sequenceType)) {
+ for (URL consumerUrl : consumerList) {
+ addSubscriberUrl(consumerUrl);
+ }
+ }
+ else if (Constants.BrokerConsumers.equals(sequenceType)) {
+ // Pick a broker consumer, for now the first one
+ URL consumerUrl = consumerList.get(0);
+ addSubscriberUrl(consumerUrl);
+ }
+
+ servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this));
+ }
+
+ public void deployBroker(String brokerID, EndpointReference brokerConsumerEPR, List<EndpointReference> consumerList) {
+ if (brokerConsumerEPR != null) {
+ addSubscriber(brokerConsumerEPR);
+ }
+ if (consumerList != null && !consumerList.isEmpty()) {
+ for (EndpointReference consumerEPR : consumerList) {
+ addSubscriber(consumerEPR);
+ }
+ }
+ setBrokerID(brokerID);
+ servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this));
+ }
+
+ public void undeployBroker(URL brokerConsumerUrl) {
+ EndpointReference brokerConsumerEpr = EncodingUtils.createEndpointReference(brokerConsumerUrl, getBrokerID());
+ ntm.removeBroker(brokerConsumerEpr, getNeighborBrokerConsumerEprs(), remoteNtmUrl);
+ removeBrokerSubscribers();
+ }
+
+ public void handle(Map<String, String> headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream) {
+
+ try {
+ EncodingObject eo = EncodingUtils.decodeFromStream(encodingRegistry, istream);
+ if (eo instanceof Subscribe) {
+ Subscribe sub = (Subscribe)eo;
+ addSubscriber(sub.getConsumerReference().getReference());
+ }
+ else if (eo instanceof ConnectionOverride) {
+ ConnectionOverride co = (ConnectionOverride)eo;
+ replaceSubscribers(co.getBrokerConsumerReference().getReference());
+ }
+ else if (eo instanceof ReplaceBrokerConnection) {
+ ReplaceBrokerConnection rbc = (ReplaceBrokerConnection)eo;
+ URL removedBrokerConsumerEpr = rbc.getRemovedBroker().getReference().getEndpointAddress().getAddress();
+ if (rbc.getNeighbors() != null) {
+ int choice = rbc.getNeighbors().getBrokerSequence().size() - 1;
+ Broker chosenBroker = rbc.getNeighbors().getBrokerSequence().get(choice);
+ replaceBrokerSubscriber(removedBrokerConsumerEpr,
+ chosenBroker.getBrokerConsumerReference().getReference());
+ brokerManager.replaceConsumersBrokerConnection(notificationType,
+ chosenBroker.getBrokerProducerReference().getReference());
+ }
+ else {
+ replaceBrokerSubscriber(removedBrokerConsumerEpr, null);
+ }
+ }
+ else {
+ throw new RuntimeException("Unknown encoding object");
+ }
+ } catch(Throwable e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public synchronized List<SubscriberInfo> getSubscribers() {
+ return subscribers;
+ }
+
+ private void addSubscriberUrl(URL subscriberUrl) {
+ addSubscriber(subscriberUrl, null);
+ }
+
+ private void addSubscriber(EndpointReference subscriberEPR) {
+ BrokerID brokerID = null;
+ if (subscriberEPR.getReferenceProperties() != null) {
+ brokerID = subscriberEPR.getReferenceProperties().getProperty(BrokerID.class);
+ }
+ addSubscriber(subscriberEPR.getEndpointAddress().getAddress(), (brokerID != null ? brokerID.getID() : null));
+ }
+
+ private void addSubscriber(URL address, String brokerID) {
+ synchronized(this) {
+ SubscriberInfo si = new SubscriberInfo(address);
+ si.brokerID = brokerID;
+ if (subscribers == null) {
+ subscribers = new ArrayList<SubscriberInfo>();
+ }
+ subscribers.add(si);
+ }
+ }
+
+ private void replaceSubscribers(EndpointReference brokerConsumerEPR) {
+ synchronized(this) {
+ subscribers = null;
+ }
+ addSubscriber(brokerConsumerEPR);
+ }
+
+ private void replaceBrokerSubscriber(URL removedBrokerConsumerUrl, EndpointReference chosenBrokerConsumerEpr) {
+ synchronized(this) {
+ if (subscribers == null) {
+ throw new RuntimeException("No subscribers");
+ }
+ SubscriberInfo siToRemove = null;
+ for (SubscriberInfo si : subscribers) {
+ if (si.address.equals(removedBrokerConsumerUrl)) {
+ siToRemove = si;
+ }
+ }
+ if (siToRemove == null) {
+ throw new RuntimeException("Can't find info for broker to remove [" + removedBrokerConsumerUrl + "]");
+ }
+ if (!subscribers.remove(siToRemove)) {
+ throw new RuntimeException("Can't remove info for [" + siToRemove.address + "]");
+ }
+ }
+ if (chosenBrokerConsumerEpr != null) {
+ addSubscriber(chosenBrokerConsumerEpr);
+ }
+ }
+
+ private List<EndpointReference> getNeighborBrokerConsumerEprs() {
+ synchronized(this) {
+ if (subscribers == null) {
+ throw new RuntimeException("No subscribers");
+ }
+ List<EndpointReference> neighborBrokerConsumerEprs = new ArrayList<EndpointReference>();
+ for(SubscriberInfo si : subscribers) {
+ if (si.brokerID != null) {
+ neighborBrokerConsumerEprs.add(EncodingUtils.createEndpointReference(si.address, si.brokerID));
+ }
+ }
+
+ return neighborBrokerConsumerEprs;
+ }
+ }
+
+ private void removeBrokerSubscribers() {
+ synchronized(this) {
+ if (subscribers == null) {
+ throw new RuntimeException("No subscribers");
+ }
+ List<SubscriberInfo> sisToRemove = new ArrayList<SubscriberInfo>();
+ for (SubscriberInfo si : subscribers) {
+ if (si.brokerID != null) {
+ sisToRemove.add(si);
+ }
+ }
+ for(SubscriberInfo si : sisToRemove) {
+ if (!subscribers.remove(si)) {
+ throw new RuntimeException("Can't remove broker subscriber [" + si.address + "]");
+ }
+ }
+ }
+ }
+
+ class SubscriberInfo {
+ public URL address;
+ public String brokerID;
+
+ public SubscriberInfo(URL address) {
+ this.address = address;
+ this.brokerID = null;
+ }
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java
new file mode 100644
index 0000000000..7748fdffe9
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationServiceBindingProvider.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerConsumerReference;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerID;
+import org.apache.tuscany.sca.binding.notification.encoding.ConnectionOverride;
+import org.apache.tuscany.sca.binding.notification.encoding.Constants;
+import org.apache.tuscany.sca.binding.notification.encoding.ConsumerReference;
+import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry;
+import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils;
+import org.apache.tuscany.sca.binding.notification.encoding.EndpointAddress;
+import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference;
+import org.apache.tuscany.sca.binding.notification.encoding.ReferenceProperties;
+import org.apache.tuscany.sca.binding.notification.encoding.Subscribe;
+import org.apache.tuscany.sca.binding.notification.util.IOUtils;
+import org.apache.tuscany.sca.binding.notification.util.NotificationServlet;
+import org.apache.tuscany.sca.binding.notification.util.URIUtil;
+import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException;
+import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable;
+import org.apache.tuscany.sca.binding.notification.util.NotificationServlet.NotificationServletStreamHandler;
+import org.apache.tuscany.sca.core.invocation.MessageImpl;
+import org.apache.tuscany.sca.host.http.ServletHost;
+import org.apache.tuscany.sca.interfacedef.Interface;
+import org.apache.tuscany.sca.interfacedef.InterfaceContract;
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.invocation.InvocationChain;
+import org.apache.tuscany.sca.invocation.Message;
+import org.apache.tuscany.sca.provider.ServiceBindingProvider;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.apache.tuscany.sca.runtime.RuntimeComponentService;
+import org.apache.tuscany.sca.runtime.RuntimeWire;
+
+/**
+ * The runtime representaion of the local service binding
+ *
+ * @version $Rev$ $Date$
+ */
+public class NotificationServiceBindingProvider
+ implements ServiceBindingProvider, NotificationServletStreamHandler {
+
+ private RuntimeWire wire;
+ private NotificationBinding notificationBinding;
+ private RuntimeComponentService service;
+ private ServletHost servletHost;
+ private NotificationTypeManager ntm;
+ private EncodingRegistry encodingRegistry;
+ private URI notificationType;
+ private URL myUrl;
+ private URL remoteNtmUrl;
+ private static final String consumerPathBase = "/consumer";
+ private boolean started;
+ private NotificationBrokerManager brokerManager;
+ private String brokerID;
+
+ public NotificationServiceBindingProvider(NotificationBinding notificationBinding,
+ RuntimeComponent component,
+ RuntimeComponentService service,
+ ServletHost servletHost,
+ NotificationTypeManager ntm,
+ EncodingRegistry encodingRegistry,
+ String httpUrl,
+ NotificationBrokerManager brokerManager) {
+ this.notificationBinding = notificationBinding;
+ this.service = service;
+ this.servletHost = servletHost;
+ this.ntm = ntm;
+ this.encodingRegistry = encodingRegistry;
+ this.notificationType = notificationBinding.getNotificationType();
+ String ntmAddress = notificationBinding.getNtmAddress();
+ String notificationTypePath = URIUtil.getPath(notificationType);
+ try {
+ this.myUrl = new URL(httpUrl + consumerPathBase + notificationTypePath);
+ remoteNtmUrl = null;
+ if (ntmAddress != null && notificationType != null) {
+ remoteNtmUrl = new URL(ntmAddress + notificationTypePath);
+ }
+ }
+ catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ this.started = false;
+ this.brokerManager = brokerManager;
+ this.brokerID = null;
+
+ URI uri = URI.create(component.getURI() + "/" + notificationBinding.getName());
+ notificationBinding.setURI(uri.toString());
+ Interface interfaze = service.getInterfaceContract().getInterface();
+ interfaze.resetDataBinding(OMElement.class.getName());
+ for (Operation operation : interfaze.getOperations()) {
+ operation.setNonBlocking(false);
+ }
+ }
+
+ public NotificationBinding getBinding() {
+ return notificationBinding;
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ public URL getURL() {
+ return myUrl;
+ }
+
+ public InterfaceContract getBindingInterfaceContract() {
+ return service.getInterfaceContract();
+ }
+
+ public boolean supportsOneWayInvocation() {
+ return false;
+ }
+
+ public void start() {
+ if (started) {
+ return;
+ }
+
+ RuntimeComponentService componentService = (RuntimeComponentService) service;
+ wire = componentService.getRuntimeWire(notificationBinding);
+
+ for (InvocationChain ch : wire.getInvocationChains()) {
+ ch.setAllowsPassByReference(true);
+ }
+
+ brokerManager.serviceProviderStarted(notificationType, this, remoteNtmUrl);
+ started = true;
+ }
+
+ public void stop() {
+ }
+
+ public void deployConsumer() {
+ WriteableSubscribe ws = new WriteableSubscribe(myUrl, null);
+ List<URL> producerList = new ArrayList<URL>();
+ String sequenceType = ntm.newConsumer(notificationType, myUrl, remoteNtmUrl, producerList);
+ if (Constants.EndProducers.equals(sequenceType)) {
+ for (URL producerUrl : producerList) {
+ subscribeWithProducer(producerUrl, null, ws);
+ }
+ }
+ else if (Constants.BrokerProducers.equals(sequenceType)) {
+ // Pick a broker producer, for now the first one
+ URL producerUrl = producerList.get(0);
+ subscribeWithProducer(producerUrl, null, ws);
+ }
+
+ servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this));
+ }
+
+ protected void subscribeWithProducer(URL producerUrl, String brokerID, WriteableSubscribe ws) {
+ if (ws == null) {
+ ws = new WriteableSubscribe(myUrl, brokerID);
+ }
+ try {
+ IOUtils.sendHttpRequest(producerUrl, Constants.SUBSCRIBE_OP, ws, null);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void deployBroker(String brokerID, EndpointReference brokerProducerEPR, List<EndpointReference> producerList) {
+ if (brokerProducerEPR != null) {
+ subscribeWithProducer(brokerProducerEPR.getEndpointAddress().getAddress(), brokerID, null);
+ }
+ this.brokerID = brokerID;
+ if (producerList != null && !producerList.isEmpty()) {
+ WriteableConnectionOverride wco = new WriteableConnectionOverride(myUrl, brokerID);
+ for (EndpointReference producerEPR : producerList) {
+ try {
+ IOUtils.sendHttpRequest(producerEPR.getEndpointAddress().getAddress(), Constants.CONNECTION_OVERRIDE_OP, wco, null);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ servletHost.addServletMapping(myUrl.toString(), new NotificationServlet(this));
+ }
+
+ public void replaceBrokerConnection(EndpointReference chosenBrokerProducerEpr) {
+ if (brokerID == null) {
+ throw new RuntimeException("Missing broker id");
+ }
+ URL producerUrl = chosenBrokerProducerEpr.getEndpointAddress().getAddress();
+ subscribeWithProducer(producerUrl, brokerID, null);
+ }
+
+ public void handle(Map<String, String> headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream) {
+ String opHeader = headers.get(IOUtils.Notification_Operation);
+ String incomingBrokerID = headers.get(Constants.Broker_ID);
+ if (opHeader == null) {
+ throw new RuntimeException("Missing operation header");
+ }
+ if (wire == null) {
+ throw new RuntimeException("Missing wire");
+ }
+ InvocationChain chain = null;
+ for (InvocationChain ch : wire.getInvocationChains()) {
+ // We may want to use more than just the op name
+ if(ch.getTargetOperation().getName().equals(opHeader)) {
+ chain = ch;
+ break;
+ }
+ }
+ if (chain == null) {
+ throw new RuntimeException("Can't find invocation chain match for [" + opHeader + "]");
+ }
+ byte[] payload = null;
+ try {
+ payload = IOUtils.readFully(istream, contentLength);
+ }
+ catch(IOException e) {
+ throw new RuntimeException(e);
+ }
+ Object[] args = getArgsFromByteArray(payload, incomingBrokerID);
+
+ invoke(chain, args);
+
+ // Doing nothing to ostream is equivalent to returning null
+ }
+
+ private Object[] getArgsFromByteArray(byte[] payload, String incomingBrokerID) {
+ try {
+ StAXOMBuilder builder = new StAXOMBuilder(new ByteArrayInputStream(payload));
+ OMElement element = builder.getDocumentElement();
+ return new Object[] { element };
+ }
+ catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void invoke(InvocationChain chain, Object[] args) {
+ Message msg = new MessageImpl();
+ msg.setBody(args);
+ chain.getHeadInvoker().invoke(msg);
+ }
+
+ class WriteableSubscribe implements Writeable {
+
+ private Subscribe sub;
+
+ public WriteableSubscribe(URL url, String brokerID) {
+ EndpointAddress epa = new EndpointAddress();
+ epa.setAddress(url);
+ EndpointReference epr = new EndpointReference();
+ epr.setEndpointAddress(epa);
+ if (brokerID != null) {
+ BrokerID cbi = new BrokerID();
+ cbi.setID(brokerID);
+ ReferenceProperties crp = new ReferenceProperties();
+ crp.addProperty(cbi);
+ epr.setReferenceProperties(crp);
+ }
+ ConsumerReference cr = new ConsumerReference();
+ cr.setReference(epr);
+ sub = new Subscribe();
+ sub.setConsumerReference(cr);
+ }
+
+ public void write(OutputStream os) throws IOUtilsException {
+ EncodingUtils.encodeToStream(encodingRegistry, sub, os);
+ }
+ }
+
+ class WriteableConnectionOverride implements Writeable {
+
+ private ConnectionOverride connectionOverride;
+
+ public WriteableConnectionOverride(URL brokerConsumerUrl, String brokerID) {
+ EndpointAddress epa = new EndpointAddress();
+ epa.setAddress(brokerConsumerUrl);
+ EndpointReference brokerConsumerEPR = new EndpointReference();
+ brokerConsumerEPR.setEndpointAddress(epa);
+ BrokerID cbi = new BrokerID();
+ cbi.setID(brokerID);
+ ReferenceProperties crp = new ReferenceProperties();
+ crp.addProperty(cbi);
+ brokerConsumerEPR.setReferenceProperties(crp);
+ BrokerConsumerReference brokerConsumerReference = new BrokerConsumerReference();
+ brokerConsumerReference.setReference(brokerConsumerEPR);
+ connectionOverride = new ConnectionOverride();
+ connectionOverride.setBrokerConsumerReference(brokerConsumerReference);
+ }
+
+ public void write(OutputStream os) throws IOUtilsException {
+ EncodingUtils.encodeToStream(encodingRegistry, connectionOverride, os);
+ }
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManager.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManager.java
new file mode 100644
index 0000000000..81f87dc4da
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManager.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+import java.net.URI;
+import java.net.URL;
+import java.util.List;
+
+import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public interface NotificationTypeManager {
+
+ String newConsumer(URI notificationType, URL consumerUrl, URL remoteNtmUrl, List<URL> producerList);
+ String newProducer(URI notificationType, URL producerUrl, URL remoteNtmUrl, List<URL> consumerList);
+ boolean newBroker(URI notificationType,
+ URL consumerUrl,
+ URL producerUrl,
+ String brokerID,
+ URL remoteNtmUrl,
+ List<EndpointReference> consumerList,
+ List<EndpointReference> producerList);
+ void newBrokerAck(URL remoteNtmUrl);
+ void removeBroker(EndpointReference brokerConsumerEpr, List<EndpointReference> neighborBrokerConsumerEprs, URL remoteNtmUrl);
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java
new file mode 100644
index 0000000000..6e61f82042
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/NotificationTypeManagerImpl.java
@@ -0,0 +1,692 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+
+import org.apache.tuscany.sca.binding.notification.encoding.Broker;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerConsumerReference;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerID;
+import org.apache.tuscany.sca.binding.notification.encoding.BrokerProducerReference;
+import org.apache.tuscany.sca.binding.notification.encoding.Brokers;
+import org.apache.tuscany.sca.binding.notification.encoding.Constants;
+import org.apache.tuscany.sca.binding.notification.encoding.EncodingException;
+import org.apache.tuscany.sca.binding.notification.encoding.EncodingObject;
+import org.apache.tuscany.sca.binding.notification.encoding.EncodingRegistry;
+import org.apache.tuscany.sca.binding.notification.encoding.EncodingUtils;
+import org.apache.tuscany.sca.binding.notification.encoding.EndConsumers;
+import org.apache.tuscany.sca.binding.notification.encoding.EndProducers;
+import org.apache.tuscany.sca.binding.notification.encoding.EndpointAddress;
+import org.apache.tuscany.sca.binding.notification.encoding.EndpointReference;
+import org.apache.tuscany.sca.binding.notification.encoding.EndpointReferenceWrapper;
+import org.apache.tuscany.sca.binding.notification.encoding.NeighborBrokerConsumers;
+import org.apache.tuscany.sca.binding.notification.encoding.Neighbors;
+import org.apache.tuscany.sca.binding.notification.encoding.NewBroker;
+import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerAck;
+import org.apache.tuscany.sca.binding.notification.encoding.NewBrokerResponse;
+import org.apache.tuscany.sca.binding.notification.encoding.NewConsumer;
+import org.apache.tuscany.sca.binding.notification.encoding.NewConsumerResponse;
+import org.apache.tuscany.sca.binding.notification.encoding.NewProducer;
+import org.apache.tuscany.sca.binding.notification.encoding.NewProducerResponse;
+import org.apache.tuscany.sca.binding.notification.encoding.RemoveBroker;
+import org.apache.tuscany.sca.binding.notification.encoding.RemovedBroker;
+import org.apache.tuscany.sca.binding.notification.encoding.ReplaceBrokerConnection;
+import org.apache.tuscany.sca.binding.notification.util.IOUtils;
+import org.apache.tuscany.sca.binding.notification.util.NotificationServlet;
+import org.apache.tuscany.sca.binding.notification.util.URIUtil;
+import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException;
+import org.apache.tuscany.sca.binding.notification.util.IOUtils.ReadableContinuation;
+import org.apache.tuscany.sca.binding.notification.util.IOUtils.Writeable;
+import org.apache.tuscany.sca.binding.notification.util.NotificationServlet.NotificationServletStreamHandler;
+import org.apache.tuscany.sca.host.http.ServletHost;
+
+/**
+ * A notification type manager serves as a registry of producers and consumers, or brokers, for
+ * any notification type. This class implements an interface that allows a reference provider
+ * (a producer), a service provider (a consumer), or both (a broker, via the provider factory),
+ * to access locally the ntm for its notification type, regardless of whether the ntm resides
+ * locally or remotely.
+ * At a given host there is only one reference provider and/or one service provider for any given
+ * notification type. So, if the ntm for a notification type resides locally, then it is invoked
+ * exclusively by either a reference provider (newProducer), a service provider (newConsumer), or
+ * a provider factory (newBroker). And since these invocations occur when the providers are being
+ * created then all three of consumerLists, producerLists and brokerLists must be null when these
+ * invocations occur.
+ *
+ * @version $Rev$ $Date$
+ */
+public class NotificationTypeManagerImpl implements NotificationTypeManager {
+
+ private static final String ntmPathBase = "/ntm";
+
+ private ServletHost servletHost;
+ private EncodingRegistry encodingRegistry;
+ private Map<URI, NotificationTypeManagerHandler> ntmHandlers;
+
+ public NotificationTypeManagerImpl() {
+ }
+
+ public void setServletHost(ServletHost servletHost) {
+ this.servletHost = servletHost;
+ }
+
+ public void setEncodingRegistry(EncodingRegistry encodingRegistry) {
+ this.encodingRegistry = encodingRegistry;
+ }
+
+ public void init() {
+ ntmHandlers = new HashMap<URI, NotificationTypeManagerHandler>();
+ }
+
+ public String newConsumer(URI notificationType, URL consumerUrl, URL remoteNtmUrl, List<URL> producerListResult) {
+ if (ntmUrlIsRemote(consumerUrl, remoteNtmUrl)) {
+ try {
+ WriteableEPW wEPW = new WriteableEPW(new NewConsumer(), consumerUrl);
+ InputStreamDecoder isd = new InputStreamDecoder();
+ NewConsumerResponse ncr =
+ (NewConsumerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_CONSUMER_OP, wEPW, isd);
+ String sequenceType = ncr.getSequenceType();
+ if (Constants.EndProducers.equals(sequenceType) || Constants.BrokerProducers.equals(sequenceType)) {
+ for (EndpointReference epr : ncr.getReferenceSequence()) {
+ producerListResult.add(epr.getEndpointAddress().getAddress());
+ }
+ }
+ return sequenceType;
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ else {
+ NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType);
+ if (ntmHandler != null) {
+ throw new RuntimeException("Trying to deploy local consumer with existing local producer, consumer or broker");
+ }
+
+ createNtmHandler(consumerUrl.getAuthority(), notificationType, consumerUrl, null, null);
+
+ return Constants.NoProducers;
+ }
+ }
+
+ private void createNtmHandler(String ntmUriAuthority, URI notificationType, URL consumerUrl, URL producerUrl, BrokerStruct broker) {
+ String ntmUri = "http://" + ntmUriAuthority + ntmPathBase + URIUtil.getPath(notificationType);
+ NotificationTypeManagerHandler ntmh = new NotificationTypeManagerHandler(notificationType, consumerUrl, producerUrl, broker);
+ ntmHandlers.put(notificationType, ntmh);
+ servletHost.addServletMapping(ntmUri, new NotificationServlet(ntmh));
+ }
+
+ public String newProducer(URI notificationType, URL producerUrl, URL remoteNtmUrl, List<URL> consumerListResult) {
+ if (ntmUrlIsRemote(producerUrl, remoteNtmUrl)) {
+ try {
+ WriteableEPW wEPW = new WriteableEPW(new NewProducer(), producerUrl);
+ InputStreamDecoder isd = new InputStreamDecoder();
+ NewProducerResponse npr =
+ (NewProducerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_PRODUCER_OP, wEPW, isd);
+ String sequenceType = npr.getSequenceType();
+ if (Constants.EndConsumers.equals(sequenceType) || Constants.BrokerConsumers.equals(sequenceType)) {
+ for (EndpointReference epr : npr.getReferenceSequence()) {
+ consumerListResult.add(epr.getEndpointAddress().getAddress());
+ }
+ }
+ return sequenceType;
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ else {
+ NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType);
+ if (ntmHandler != null) {
+ throw new RuntimeException("Trying to deploy local producer with existing local producer, consumer or broker");
+ }
+
+ createNtmHandler(producerUrl.getAuthority(), notificationType, null, producerUrl, null);
+
+ return Constants.NoConsumers;
+ }
+ }
+
+ public boolean newBroker(URI notificationType,
+ URL consumerUrl,
+ URL producerUrl,
+ String brokerID,
+ URL remoteNtmUrl,
+ List<EndpointReference> consumerListResult,
+ List<EndpointReference> producerListResult) {
+ String ntmUriAuthority = producerUrl.getAuthority();
+ if (!ntmUriAuthority.equals(consumerUrl.getAuthority())) {
+ throw new RuntimeException("Producer url and consumer url do not match");
+ }
+ if (ntmUrlIsRemote(producerUrl, remoteNtmUrl)) {
+ try {
+ WriteableNewBroker wnb = new WriteableNewBroker(consumerUrl, producerUrl, brokerID);
+ InputStreamDecoder isd = new InputStreamDecoder();
+ NewBrokerResponse nbr =
+ (NewBrokerResponse)IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_BROKER_OP, wnb, isd);
+ if (nbr.isFirstBroker()) {
+ if (nbr.getEndConsumers().getSequenceType().equals(Constants.EndConsumers)) {
+ for (EndpointReference epr : nbr.getEndConsumers().getReferenceSequence()) {
+ consumerListResult.add(epr);
+ }
+ }
+ if (nbr.getEndProducers().getSequenceType().equals(Constants.EndProducers)) {
+ for (EndpointReference epr : nbr.getEndProducers().getReferenceSequence()) {
+ producerListResult.add(epr);
+ }
+ }
+ }
+ else {
+ for (Broker broker : nbr.getBrokers().getBrokerSequence()) {
+ consumerListResult.add(broker.getBrokerConsumerReference().getReference());
+ producerListResult.add(broker.getBrokerProducerReference().getReference());
+ }
+ }
+ return nbr.isFirstBroker();
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ else {
+ NotificationTypeManagerHandler ntmHandler = ntmHandlers.get(notificationType);
+ if (ntmHandler != null) {
+ throw new RuntimeException("Trying to deploy local broker with existing local producer, consumer or broker");
+ }
+
+ BrokerStruct broker = new BrokerStruct(consumerUrl, producerUrl, brokerID);
+ createNtmHandler(ntmUriAuthority, notificationType, null, null, broker);
+
+ return true;
+ }
+ }
+
+ private boolean ntmUrlIsRemote(URL localUrl, URL ntmUrl) {
+ if (ntmUrl == null) {
+ return false;
+ }
+ if (localUrl.getPort() != ntmUrl.getPort()) {
+ return true;
+ }
+ String remoteNtmUrlAuthority = ntmUrl.getAuthority();
+ if (remoteNtmUrlAuthority.indexOf("localhost") >= 0) {
+ return false;
+ }
+ return !localUrl.getAuthority().equals(remoteNtmUrlAuthority);
+ }
+
+ public void newBrokerAck(URL remoteNtmUrl) {
+ try {
+ IOUtils.sendHttpRequest(remoteNtmUrl, Constants.NEW_BROKER_ACK_OP, new WriteableNewBrokerAck(), null);
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void removeBroker(EndpointReference brokerConsumerEpr, List<EndpointReference> neighborBrokerConsumerEprs, URL remoteNtmUrl) {
+ WriteableRemoveBroker wrb = new WriteableRemoveBroker(brokerConsumerEpr, neighborBrokerConsumerEprs);
+
+ try {
+ IOUtils.sendHttpRequest(remoteNtmUrl, Constants.REMOVE_BROKER_OP, wrb, null);
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private class NotificationTypeManagerHandler implements NotificationServletStreamHandler {
+
+ private URI notificationType;
+ List<URL> consumerList;
+ List<URL> producerList;
+ List<BrokerStruct> brokerList;
+ private NotificationTypeLock notificationTypeLock;
+ private BrokerStruct pendingBroker;
+
+ public NotificationTypeManagerHandler(URI notificationType) {
+ this.notificationType = notificationType;
+ this.notificationTypeLock = new NotificationTypeLock();
+ this.pendingBroker = null;
+ }
+
+ public NotificationTypeManagerHandler(URI notificationType, URL consumerUrl, URL producerUrl, BrokerStruct broker) {
+ this(notificationType);
+ if (consumerUrl != null) {
+ addConsumer(consumerUrl);
+ }
+ else if (producerUrl != null) {
+ addProducer(producerUrl);
+ }
+ else if (broker != null) {
+ addBroker(broker);
+ }
+ }
+
+ private void addConsumer(URL consumerUrl) {
+ if (consumerList == null) {
+ consumerList = new ArrayList<URL>();
+ }
+ consumerList.add(consumerUrl);
+ }
+
+ private void addProducer(URL producerUrl) {
+ if (producerList == null) {
+ producerList = new ArrayList<URL>();
+ }
+ producerList.add(producerUrl);
+ }
+
+ private void addBroker(BrokerStruct broker) {
+ if (brokerList == null) {
+ brokerList = new ArrayList<BrokerStruct>();
+ }
+ brokerList.add(broker);
+ }
+
+ public void handle(Map<String, String> headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream) {
+ String opHeader = headers.get(IOUtils.Notification_Operation);
+ EncodingObject eo = null;
+ try {
+ eo = EncodingUtils.decodeFromStream(encodingRegistry, istream);
+ }
+ catch(EncodingException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (Constants.NEW_CONSUMER_OP.equals(opHeader)) {
+ handleNewConsumer((NewConsumer)eo, ostream);
+ }
+ else if(Constants.NEW_PRODUCER_OP.equals(opHeader)) {
+ handleNewProducer((NewProducer)eo, ostream);
+ }
+ else if(Constants.NEW_BROKER_OP.equals(opHeader)) {
+ handleNewBroker((NewBroker)eo, ostream);
+ }
+ else if (Constants.NEW_BROKER_ACK_OP.equals(opHeader)) {
+ handleNewBrokerAck();
+ }
+ else if (Constants.REMOVE_BROKER_OP.equals(opHeader)) {
+ handleRemoveBroker((RemoveBroker)eo);
+ }
+ }
+
+ private void handleNewConsumer(NewConsumer nc, ServletOutputStream ostream) {
+ synchronized(notificationTypeLock) {
+ if (notificationTypeLock.isLocked) {
+ try { notificationTypeLock.wait(); } catch(InterruptedException e) {}
+ }
+ URL consumerUrl = nc.getReference().getEndpointAddress().getAddress();
+ if (brokerList == null) {
+ addConsumer(consumerUrl);
+ }
+
+ NewConsumerResponse ncr = new NewConsumerResponse();
+ if (producerList != null) {
+ ncr.setSequenceType(Constants.EndProducers);
+ for (URL producerUrl : producerList) {
+ ncr.addReferenceToSequence(EncodingUtils.createEndpointReference(producerUrl, null));
+ }
+ }
+ else if(brokerList != null) {
+ ncr.setSequenceType(Constants.BrokerProducers);
+ for (BrokerStruct broker : brokerList) {
+ ncr.addReferenceToSequence(EncodingUtils.createEndpointReference(broker.producerUrl, null));
+ }
+ }
+ else {
+ ncr.setSequenceType(Constants.NoProducers);
+ }
+ try {
+ EncodingUtils.encodeToStream(encodingRegistry, ncr, ostream);
+ }
+ catch(IOUtilsException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void handleNewProducer(NewProducer np, ServletOutputStream ostream) {
+ synchronized(notificationTypeLock) {
+ if (notificationTypeLock.isLocked) {
+ try { notificationTypeLock.wait(); } catch(InterruptedException e) {}
+ }
+ URL producerUrl = np.getReference().getEndpointAddress().getAddress();
+ if (brokerList == null) {
+ addProducer(producerUrl);
+ }
+
+ NewProducerResponse npr = new NewProducerResponse();
+ if (consumerList != null) {
+ npr.setSequenceType(Constants.EndConsumers);
+ for (URL consumerUrl : consumerList) {
+ npr.addReferenceToSequence(EncodingUtils.createEndpointReference(consumerUrl, null));
+ }
+ }
+ else if(brokerList != null) {
+ npr.setSequenceType(Constants.BrokerConsumers);
+ for (BrokerStruct broker : brokerList) {
+ npr.addReferenceToSequence(EncodingUtils.createEndpointReference(broker.consumerUrl, null));
+ }
+ }
+ else {
+ npr.setSequenceType(Constants.NoConsumers);
+ }
+ try {
+ EncodingUtils.encodeToStream(encodingRegistry, npr, ostream);
+ }
+ catch(IOUtilsException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void handleNewBroker(NewBroker nb, ServletOutputStream ostream) {
+ synchronized(notificationTypeLock) {
+ if (notificationTypeLock.isLocked) {
+ try { notificationTypeLock.wait(); } catch(InterruptedException e) {}
+ }
+ NewBrokerResponse nbr = new NewBrokerResponse();
+ if (consumerList != null || producerList != null || brokerList == null) {
+ nbr.setFirstBroker(true);
+ EndConsumers endConsumers = new EndConsumers();
+ if (consumerList != null) {
+ endConsumers.setSequenceType(Constants.EndConsumers);
+ for (URL consumerUrl : consumerList) {
+ endConsumers.addReferenceToSequence(EncodingUtils.createEndpointReference(consumerUrl, null));
+ }
+ }
+ else {
+ endConsumers.setSequenceType(Constants.NoConsumers);
+ }
+ nbr.setEndConsumers(endConsumers);
+ EndProducers endProducers = new EndProducers();
+ if (producerList != null) {
+ endProducers.setSequenceType(Constants.EndProducers);
+ for (URL producerUrl : producerList) {
+ endProducers.addReferenceToSequence(EncodingUtils.createEndpointReference(producerUrl, null));
+ }
+ }
+ else {
+ endProducers.setSequenceType(Constants.NoProducers);
+ }
+ nbr.setEndProducers(endProducers);
+ }
+ else {
+ nbr.setFirstBroker(false);
+ Brokers brokers = new Brokers();
+ for (BrokerStruct brokerStruct : brokerList) {
+ Broker brokerElt = new Broker();
+ BrokerConsumerReference bcr = new BrokerConsumerReference();
+ bcr.setReference(EncodingUtils.createEndpointReference(brokerStruct.consumerUrl, brokerStruct.brokerID));
+ brokerElt.setBrokerConsumerReference(bcr);
+
+ BrokerProducerReference bpr = new BrokerProducerReference();
+ bpr.setReference(EncodingUtils.createEndpointReference(brokerStruct.producerUrl, brokerStruct.brokerID));
+ brokerElt.setBrokerProducerReference(bpr);
+ brokers.addBrokerToSequence(brokerElt);
+ }
+ nbr.setBrokers(brokers);
+ }
+ EndpointReference consumerEPR = nb.getBrokerConsumerReference().getReference();
+ URL consumerUrl = consumerEPR.getEndpointAddress().getAddress();
+ BrokerID consumerBrokerID = consumerEPR.getReferenceProperties().getProperty(BrokerID.class);
+ EndpointReference producerEPR = nb.getBrokerProducerReference().getReference();
+ URL producerUrl = producerEPR.getEndpointAddress().getAddress();
+ BrokerID producerBrokerID = producerEPR.getReferenceProperties().getProperty(BrokerID.class);
+ if (consumerBrokerID == null ||
+ producerBrokerID == null ||
+ !consumerBrokerID.getID().equals(producerBrokerID.getID())) {
+ throw new RuntimeException("Producer and consumer broker ids do not match");
+ }
+ // only add broker if consumerList == null && producerList == null
+ // otherwise, make it a pending broker and wait for ack
+ // TODO block for a configurable amount of time
+ BrokerStruct broker = new BrokerStruct(consumerUrl, producerUrl, consumerBrokerID.getID());
+ if (consumerList == null && producerList == null) {
+ addBroker(broker);
+ }
+ else {
+ pendingBroker = broker;
+ notificationTypeLock.isLocked = true;
+ }
+ try {
+ EncodingUtils.encodeToStream(encodingRegistry, nbr, ostream);
+ }
+ catch(IOUtilsException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void handleNewBrokerAck() {
+ synchronized(notificationTypeLock) {
+ if (!notificationTypeLock.isLocked) {
+ notificationTypeLock.notifyAll();
+ throw new RuntimeException("Notification type should be locked");
+ }
+ if (brokerList != null) {
+ notificationTypeLock.isLocked = false;
+ notificationTypeLock.notifyAll();
+ throw new RuntimeException("Can't add pending broker to non-empty broker list");
+ }
+ if (pendingBroker == null) {
+ notificationTypeLock.isLocked = false;
+ notificationTypeLock.notifyAll();
+ throw new RuntimeException("Missing pending broker");
+ }
+ addBroker(pendingBroker);
+ consumerList = null;
+ producerList = null;
+ pendingBroker = null;
+ notificationTypeLock.isLocked = false;
+ notificationTypeLock.notifyAll();
+ }
+ }
+
+ private void handleRemoveBroker(RemoveBroker rb) {
+ synchronized(notificationTypeLock) {
+ if (notificationTypeLock.isLocked) {
+ try { notificationTypeLock.wait(); } catch(InterruptedException e) {}
+ }
+
+ if (brokerList == null) {
+ throw new RuntimeException("No broker to remove for [" + notificationType + "]");
+ }
+
+ NeighborBrokerConsumers nbcs = rb.getNeighborBrokerConsumers();
+ EndpointReference rbEpr = rb.getBrokerConsumerReference().getReference();
+ if (nbcs != null && nbcs.getReferenceSequence() != null) {
+ List<Broker> neighborBrokers = new ArrayList<Broker>();
+ for (EndpointReference neighborBrokerConsumerEpr : nbcs.getReferenceSequence()) {
+ BrokerStruct neighborBrokerStruct = null;
+ URL neighborBrokerConsumerEprUrl = neighborBrokerConsumerEpr.getEndpointAddress().getAddress();
+ for (BrokerStruct brokerStruct : brokerList) {
+ if (brokerStruct.consumerUrl.equals(neighborBrokerConsumerEprUrl)) {
+ neighborBrokerStruct = brokerStruct;
+ break;
+ }
+ }
+ if (neighborBrokerStruct == null) {
+ throw new RuntimeException("Can't find neighbor broker for consumer EPR [" +
+ neighborBrokerConsumerEprUrl + "]");
+ }
+ BrokerConsumerReference bcr = new BrokerConsumerReference();
+ bcr.setReference(EncodingUtils.createEndpointReference(neighborBrokerStruct.consumerUrl, neighborBrokerStruct.brokerID));
+ BrokerProducerReference bpr = new BrokerProducerReference();
+ bpr.setReference(EncodingUtils.createEndpointReference(neighborBrokerStruct.producerUrl, neighborBrokerStruct.brokerID));
+ Broker neighborBroker = new Broker();
+ neighborBroker.setBrokerConsumerReference(bcr);
+ neighborBroker.setBrokerProducerReference(bpr);
+ neighborBrokers.add(neighborBroker);
+ }
+ int lastIndex = neighborBrokers.size() - 1;
+ for (int index = lastIndex; index >= 0; index--) {
+ List<Broker> writeableNeighborBrokers = ((index > 0) ? neighborBrokers.subList(0, index) : null);
+ WriteableReplaceBrokerConnection wrbc = new WriteableReplaceBrokerConnection(rbEpr, writeableNeighborBrokers);
+ URL targetUrl =
+ neighborBrokers.get(index).getBrokerProducerReference().getReference().getEndpointAddress().getAddress();
+ try {
+ IOUtils.sendHttpRequest(targetUrl, Constants.REPLACE_BROKER_CONNECTION_OP, wrbc, null);
+ } catch(Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ BrokerStruct removedBrokerStruct = null;
+ URL rbEprUrl = rbEpr.getEndpointAddress().getAddress();
+ for (BrokerStruct brokerSruct : brokerList) {
+ if (brokerSruct.consumerUrl.equals(rbEprUrl)) {
+ removedBrokerStruct = brokerSruct;
+ break;
+ }
+ }
+ if (removedBrokerStruct == null) {
+ throw new RuntimeException("Can't find broker to remove for EPR [" + rbEprUrl + "]");
+ }
+ if(!brokerList.remove(removedBrokerStruct)) {
+ throw new RuntimeException("Broker was not removed");
+ }
+ }
+ }
+ }
+
+ class NotificationTypeLock {
+ public boolean isLocked;
+ }
+
+ class WriteableEPW implements Writeable {
+ private EndpointReferenceWrapper epw;
+
+ public WriteableEPW(EndpointReferenceWrapper epw, URL url) {
+ EndpointAddress epa = new EndpointAddress();
+ epa.setAddress(url);
+ EndpointReference epr = new EndpointReference();
+ epr.setEndpointAddress(epa);
+ epw.setReference(epr);
+ this.epw = epw;
+ }
+
+ public void write(OutputStream os) throws IOUtilsException {
+ EncodingUtils.encodeToStream(encodingRegistry, epw, os);
+ }
+ }
+
+ class InputStreamDecoder implements ReadableContinuation {
+
+ public Object read(InputStream istream) throws IOUtilsException {
+ try {
+ return EncodingUtils.decodeFromStream(encodingRegistry, istream);
+ }
+ catch(EncodingException e) {
+ throw new IOUtilsException(e);
+ }
+ }
+ }
+
+ class BrokerStruct {
+ public URL consumerUrl;
+ public URL producerUrl;
+ public String brokerID;
+
+ public BrokerStruct(URL consumerUrl, URL producerUrl, String brokerID) {
+ this.consumerUrl = consumerUrl;
+ this.producerUrl = producerUrl;
+ this.brokerID = brokerID;
+ }
+ }
+
+ class WriteableNewBroker implements Writeable {
+ private NewBroker newBroker;
+
+ public WriteableNewBroker(URL consumerUrl, URL producerUrl, String brokerID) {
+ newBroker = new NewBroker();
+ BrokerConsumerReference bcr = new BrokerConsumerReference();
+ bcr.setReference(EncodingUtils.createEndpointReference(consumerUrl, brokerID));
+ newBroker.setBrokerConsumerReference(bcr);
+
+ BrokerProducerReference bpr = new BrokerProducerReference();
+ bpr.setReference(EncodingUtils.createEndpointReference(producerUrl, brokerID));
+ newBroker.setBrokerProducerReference(bpr);
+ }
+
+ public void write(OutputStream os) throws IOUtilsException {
+ EncodingUtils.encodeToStream(encodingRegistry, newBroker, os);
+ }
+ }
+
+ class WriteableNewBrokerAck implements Writeable {
+ public void write(OutputStream os) throws IOUtilsException {
+ EncodingUtils.encodeToStream(encodingRegistry, new NewBrokerAck(), os);
+ }
+ }
+
+ class WriteableRemoveBroker implements Writeable {
+ private RemoveBroker removeBroker;
+
+ public WriteableRemoveBroker(EndpointReference brokerConsumerEpr, List<EndpointReference> neighborBrokerConsumerEprs) {
+ removeBroker = new RemoveBroker();
+ BrokerConsumerReference brokerConsumerReference = new BrokerConsumerReference();
+ brokerConsumerReference.setReference(brokerConsumerEpr);
+ removeBroker.setBrokerConsumerReference(brokerConsumerReference);
+ if (neighborBrokerConsumerEprs != null) {
+ NeighborBrokerConsumers neighborBrokerConsumers = new NeighborBrokerConsumers();
+ neighborBrokerConsumers.setReferenceSequence(neighborBrokerConsumerEprs);
+ neighborBrokerConsumers.setSequenceType(Constants.BrokerConsumers);
+ removeBroker.setNeighborBrokerConsumers(neighborBrokerConsumers);
+ }
+ }
+
+ public void write(OutputStream os) throws IOUtilsException {
+ EncodingUtils.encodeToStream(encodingRegistry, removeBroker, os);
+ }
+ }
+
+ class WriteableReplaceBrokerConnection implements Writeable {
+ private ReplaceBrokerConnection replaceBrokerConnection;
+
+ public WriteableReplaceBrokerConnection(EndpointReference removedBrokerEpr, List<Broker> brokerSequence) {
+ replaceBrokerConnection = new ReplaceBrokerConnection();
+ RemovedBroker removedBroker = new RemovedBroker();
+ removedBroker.setReference(removedBrokerEpr);
+ replaceBrokerConnection.setRemovedBroker(removedBroker);
+ if (brokerSequence != null) {
+ Neighbors neighbors = new Neighbors();
+ neighbors.setBrokerSequence(brokerSequence);
+ replaceBrokerConnection.setNeighbors(neighbors);
+ }
+ }
+
+ public void write(OutputStream os) throws IOUtilsException {
+ EncodingUtils.encodeToStream(encodingRegistry, replaceBrokerConnection, os);
+ }
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBroker.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBroker.java
new file mode 100644
index 0000000000..a46ae13763
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBroker.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class AbstractBroker implements EncodingObject {
+
+ private BrokerProducerReference brokerProducerReference;
+ private BrokerConsumerReference brokerConsumerReference;
+
+ public BrokerProducerReference getBrokerProducerReference() {
+ return this.brokerProducerReference;
+ }
+
+ public void setBrokerProducerReference(BrokerProducerReference brokerProducerReference) {
+ this.brokerProducerReference = brokerProducerReference;
+ }
+
+ public BrokerConsumerReference getBrokerConsumerReference() {
+ return this.brokerConsumerReference;
+ }
+
+ public void setBrokerConsumerReference(BrokerConsumerReference brokerConsumerReference) {
+ this.brokerConsumerReference = brokerConsumerReference;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBrokerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBrokerEnDeCoder.java
new file mode 100644
index 0000000000..8dc5be9b16
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractBrokerEnDeCoder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public abstract class AbstractBrokerEnDeCoder<B extends AbstractBroker> extends AbstractEnDeCoder<B> {
+
+ public AbstractBrokerEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(B encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ QName qName = getEncodingObjectQName();
+ writer.writeStartElement(Constants.NOTIFICATION_PREFIX, qName.getLocalPart(), qName.getNamespaceURI());
+ writer.writeNamespace(Constants.NOTIFICATION_PREFIX, qName.getNamespaceURI());
+ registry.encode(encodingObject.getBrokerConsumerReference(), writer);
+ registry.encode(encodingObject.getBrokerProducerReference(), writer);
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public B decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ B brokerElement = getEncodingObjectType().newInstance();
+ boolean haveBCR = false;
+ boolean haveBPR = false;
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ EncodingObject encodingObject = registry.decode(reader);
+ if (encodingObject instanceof BrokerConsumerReference && !haveBCR) {
+ brokerElement.setBrokerConsumerReference((BrokerConsumerReference)encodingObject);
+ haveBCR = true;
+ }
+ else if(encodingObject instanceof BrokerProducerReference && !haveBPR) {
+ brokerElement.setBrokerProducerReference((BrokerProducerReference)encodingObject);
+ haveBPR = true;
+ }
+ else {
+ throw new EncodingException("Invalid encoding object");
+ }
+ break;
+ case END_ELEMENT:
+ if (!haveBCR) {
+ throw new EncodingException("Missing broker consumer reference");
+ }
+ if (!haveBPR) {
+ throw new EncodingException("Missing broker producer reference");
+ }
+ return brokerElement;
+ }
+ }
+ } catch (Exception ex) {
+ throw new EncodingException(ex);
+ }
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractEnDeCoder.java
new file mode 100644
index 0000000000..015ccebaa9
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/AbstractEnDeCoder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public abstract class AbstractEnDeCoder<E extends EncodingObject> implements
+ EnDeCoder<E> {
+
+ protected EncodingRegistry registry;
+
+ protected AbstractEnDeCoder(EncodingRegistry registry) {
+
+ this.registry = registry;
+ }
+
+ public void start() {
+ Class<E> encodingType = getEncodingObjectType();
+ QName encodingQName = getEncodingObjectQName();
+
+ registry.registerEnDeCoder(encodingType, encodingQName, this);
+ }
+
+ public void stop() {
+ Class<E> encodingType = getEncodingObjectType();
+ QName encodingQName = getEncodingObjectQName();
+
+ registry.unregisterEnDeCoder(encodingType, encodingQName);
+ }
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Broker.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Broker.java
new file mode 100644
index 0000000000..41610edc3b
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Broker.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class Broker extends AbstractBroker {
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReference.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReference.java
new file mode 100644
index 0000000000..e80af5ea68
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReference.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class BrokerConsumerReference extends EndpointReferenceWrapper {
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReferenceEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReferenceEnDeCoder.java
new file mode 100644
index 0000000000..7df1c21ca9
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerConsumerReferenceEnDeCoder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class BrokerConsumerReferenceEnDeCoder extends EndpointReferenceWrapperEnDeCoder<BrokerConsumerReference> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.BrokerConsumerReference);
+
+ public BrokerConsumerReferenceEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+
+ public QName getEncodingObjectQName() {
+ return QNAME;
+ }
+
+
+ public Class<BrokerConsumerReference> getEncodingObjectType() {
+ return BrokerConsumerReference.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerEnDeCoder.java
new file mode 100644
index 0000000000..a4bee61947
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerEnDeCoder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class BrokerEnDeCoder extends AbstractBrokerEnDeCoder<Broker> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.Broker);
+
+ public BrokerEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<Broker> getEncodingObjectType() {
+
+ return Broker.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerID.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerID.java
new file mode 100644
index 0000000000..aec7907441
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerID.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import java.util.UUID;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class BrokerID implements EncodingObject {
+
+ private String id;
+
+ public String getID() {
+ return id;
+ }
+
+ public void setID(String id) {
+ this.id = id;
+ }
+
+ public static String generate() {
+ return UUID.randomUUID().toString();
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerIDEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerIDEnDeCoder.java
new file mode 100644
index 0000000000..68d9340abf
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerIDEnDeCoder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class BrokerIDEnDeCoder extends AbstractEnDeCoder<BrokerID> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.BrokerID);
+
+ public BrokerIDEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(BrokerID encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI());
+ writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI());
+ writer.writeCharacters(encodingObject.getID());
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public BrokerID decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ BrokerID brokerIDElement = new BrokerID();
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ break;
+ case XMLStreamConstants.CHARACTERS:
+ if (reader.hasText()) {
+ String id = reader.getText();
+ brokerIDElement.setID(id);
+ }
+ else {
+ throw new EncodingException("Broker ID missing value");
+ }
+ break;
+ case END_ELEMENT:
+ return brokerIDElement;
+ }
+ }
+ } catch (XMLStreamException ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<BrokerID> getEncodingObjectType() {
+
+ return BrokerID.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReference.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReference.java
new file mode 100644
index 0000000000..04d7aca0e6
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReference.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class BrokerProducerReference extends EndpointReferenceWrapper {
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReferenceEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReferenceEnDeCoder.java
new file mode 100644
index 0000000000..ea66e8f11b
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokerProducerReferenceEnDeCoder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class BrokerProducerReferenceEnDeCoder extends EndpointReferenceWrapperEnDeCoder<BrokerProducerReference> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.BrokerProducerReference);
+
+ public BrokerProducerReferenceEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+
+ public QName getEncodingObjectQName() {
+ return QNAME;
+ }
+
+
+ public Class<BrokerProducerReference> getEncodingObjectType() {
+ return BrokerProducerReference.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Brokers.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Brokers.java
new file mode 100644
index 0000000000..9b7eb149e0
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Brokers.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class Brokers implements EncodingObject {
+
+ private List<Broker> brokerSequence;
+
+ public List<Broker> getBrokerSequence() {
+ return brokerSequence;
+ }
+
+ public void addBrokerToSequence(Broker broker) {
+ if(this.brokerSequence == null) {
+ this.brokerSequence = new ArrayList<Broker>();
+ }
+ this.brokerSequence.add(broker);
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokersEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokersEnDeCoder.java
new file mode 100644
index 0000000000..1c3a0aaee8
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/BrokersEnDeCoder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class BrokersEnDeCoder extends AbstractEnDeCoder<Brokers> {
+
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.Brokers);
+
+ public BrokersEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(Brokers encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ QName qName = getEncodingObjectQName();
+ writer.writeStartElement(Constants.NOTIFICATION_PREFIX, qName.getLocalPart(), qName.getNamespaceURI());
+ writer.writeNamespace(Constants.NOTIFICATION_PREFIX, qName.getNamespaceURI());
+ if (encodingObject.getBrokerSequence() != null) {
+ for (Broker broker : encodingObject.getBrokerSequence()) {
+ registry.encode(broker, writer);
+ }
+ }
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public Brokers decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ Brokers brokersElement = new Brokers();
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ EncodingObject encodingObject = registry.decode(reader);
+ brokersElement.addBrokerToSequence((Broker)encodingObject);
+ break;
+ case END_ELEMENT:
+ return brokersElement;
+ }
+ }
+ } catch (XMLStreamException ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<Brokers> getEncodingObjectType() {
+
+ return Brokers.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverride.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverride.java
new file mode 100644
index 0000000000..eb1a89812d
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverride.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class ConnectionOverride implements EncodingObject {
+
+ private BrokerConsumerReference brokerConsumerReference;
+
+ public BrokerConsumerReference getBrokerConsumerReference() {
+ return this.brokerConsumerReference;
+ }
+
+ public void setBrokerConsumerReference(BrokerConsumerReference brokerConsumerReference) {
+ this.brokerConsumerReference = brokerConsumerReference;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideEnDeCoder.java
new file mode 100644
index 0000000000..af20d02c00
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideEnDeCoder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class ConnectionOverrideEnDeCoder extends AbstractEnDeCoder<ConnectionOverride> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.ConnectionOverride);
+
+ public ConnectionOverrideEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(ConnectionOverride encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI());
+ writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI());
+ registry.encode(encodingObject.getBrokerConsumerReference(), writer);
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public ConnectionOverride decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ ConnectionOverride connectionOverrideElement = new ConnectionOverride();
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ EncodingObject encodingObject = registry.decode(reader);
+ connectionOverrideElement.setBrokerConsumerReference((BrokerConsumerReference)encodingObject);
+ break;
+ case END_ELEMENT:
+ return connectionOverrideElement;
+ }
+ }
+ } catch (Exception ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<ConnectionOverride> getEncodingObjectType() {
+
+ return ConnectionOverride.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponse.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponse.java
new file mode 100644
index 0000000000..8dcf9fda58
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponse.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class ConnectionOverrideResponse implements EncodingObject {
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponseEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponseEnDeCoder.java
new file mode 100644
index 0000000000..c5c8a99e5c
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConnectionOverrideResponseEnDeCoder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class ConnectionOverrideResponseEnDeCoder extends AbstractEnDeCoder<ConnectionOverrideResponse> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.ConnectionOverrideResponse);
+
+ public ConnectionOverrideResponseEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(ConnectionOverrideResponse encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI());
+ writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI());
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public ConnectionOverrideResponse decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ ConnectionOverrideResponse connectionOverrideResponseElement = new ConnectionOverrideResponse();
+ while (true) {
+ switch (reader.next()) {
+ case END_ELEMENT:
+ return connectionOverrideResponseElement;
+ }
+ }
+ } catch (Exception ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<ConnectionOverrideResponse> getEncodingObjectType() {
+
+ return ConnectionOverrideResponse.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Constants.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Constants.java
new file mode 100644
index 0000000000..e92ffc1d0c
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Constants.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public interface Constants {
+
+ String NOTIFICATION_NS = "http://docs.oasis-open.org/wsn/b-2";
+ String NOTIFICATION_PREFIX = "wsnt";
+ String ADDRESSING_NS = "http://schemas.xmlsoap.org/ws/2004/08/addressing";
+ String ADDRESSING_PREFIX = "wsa";
+ String Subscribe = "Subscribe";
+ String ConsumerReference = "ConsumerReference";
+ String Address = "Address";
+ String ReferenceProperties = "ReferenceProperties";
+ String EndpointReference = "EndpointReference";
+ String BrokerID = "BrokerID";
+ String NewConsumer = "NewConsumer";
+ String NewProducer = "NewProducer";
+ String NewConsumerResponse = "NewConsumerResponse";
+ String NewProducerResponse = "NewProducerResponse";
+ String ConsumerSequenceType = "ConsumerSequenceType";
+ String EndConsumers = "EndConsumers";
+ String BrokerConsumers = "BrokerConsumers";
+ String NoConsumers = "NoConsumers";
+ String ProducerSequenceType = "ProducerSequenceType";
+ String EndProducers = "EndProducers";
+ String BrokerProducers = "BrokerProducers";
+ String NoProducers = "NoProducers";
+ String Broker = "Broker";
+ String NewBroker = "NewBroker";
+ String NewBrokerAck = "NewBrokerAck";
+ String BrokerConsumerReference = "BrokerConsumerReference";
+ String BrokerProducerReference = "BrokerProducerReference";
+ String NewBrokerResponse = "NewBrokerResponse";
+ String FirstBroker = "FirstBroker";
+ String Brokers = "Brokers";
+ String ConnectionOverride = "ConnectionOverride";
+ String ConnectionOverrideResponse = "ConnectionOverrideResponse";
+ String NeighborBrokerConsumers = "NeighborBrokerConsumers";
+ String RemoveBroker = "RemoveBroker";
+ String RemovedBroker = "RemovedBroker";
+ String Neighbors = "Neighbors";
+ String ReplaceBrokerConnection = "ReplaceBrokerConnection";
+
+ String SUBSCRIBE_OP = "subscribe";
+ String CONNECTION_OVERRIDE_OP = "connectionOverride";
+ String NEW_CONSUMER_OP = "newConsumer";
+ String NEW_PRODUCER_OP = "newProducer";
+ String NEW_BROKER_OP = "newBroker";
+ String NEW_BROKER_ACK_OP = "newBrokerAck";
+ String REMOVE_BROKER_OP = "removeBroker";
+ String REPLACE_BROKER_CONNECTION_OP = "replaceBrokerConnection";
+
+ String Broker_ID = "brokerID";
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReference.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReference.java
new file mode 100644
index 0000000000..c2c16ab5cd
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReference.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class ConsumerReference extends EndpointReferenceWrapper {
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReferenceEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReferenceEnDeCoder.java
new file mode 100644
index 0000000000..68099b40bb
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ConsumerReferenceEnDeCoder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class ConsumerReferenceEnDeCoder extends EndpointReferenceWrapperEnDeCoder<ConsumerReference> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.ConsumerReference);
+
+ public ConsumerReferenceEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+
+ public QName getEncodingObjectQName() {
+ return QNAME;
+ }
+
+
+ public Class<ConsumerReference> getEncodingObjectType() {
+ return ConsumerReference.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/DefaultEncodingRegistry.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/DefaultEncodingRegistry.java
new file mode 100644
index 0000000000..9533ea3062
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/DefaultEncodingRegistry.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class DefaultEncodingRegistry implements EncodingRegistry {
+
+ private final Map<Class<? extends EncodingObject>, EnDeCoder> encoderRegistry =
+ new ConcurrentHashMap<Class<? extends EncodingObject>, EnDeCoder>();
+
+ private final Map<QName, EnDeCoder> decoderRegistry = new ConcurrentHashMap<QName, EnDeCoder>();
+
+ public DefaultEncodingRegistry() {
+ }
+
+ public <E extends EncodingObject> void registerEnDeCoder(Class<E> encodingClass, QName qname, EnDeCoder<E> enDeCoder) {
+
+ encoderRegistry.put(encodingClass, enDeCoder);
+ decoderRegistry.put(qname, enDeCoder);
+ }
+
+ public <E extends EncodingObject> void unregisterEnDeCoder(Class<E> encodingClass, QName qname) {
+
+ encoderRegistry.remove(encodingClass);
+ decoderRegistry.remove(qname);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void encode(EncodingObject encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ EnDeCoder encoder = encoderRegistry.get(encodingObject.getClass());
+ if (encoder == null) {
+ throw new EncodingException("No encoder defined for " + encodingObject.getClass());
+ }
+ encoder.encode(encodingObject, writer);
+ }
+
+ public EncodingObject decode(XMLStreamReader reader) throws EncodingException {
+
+ QName qname = reader.getName();
+
+ EnDeCoder decoder = decoderRegistry.get(qname);
+ if (decoder == null) {
+ throw new EncodingException("No decoder defined for " + qname);
+ }
+ return decoder.decode(reader);
+ }
+
+ public void stop() {
+ encoderRegistry.clear();
+ decoderRegistry.clear();
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EnDeCoder.java
new file mode 100644
index 0000000000..159357ed90
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EnDeCoder.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public interface EnDeCoder<E extends EncodingObject> {
+
+ /**
+ * Encodes an object to the specified stream writer.
+ *
+ * @param encodingObject Object to be serialized.
+ * @param writer Stream writer to which the infoset is serialized.
+ * @throws EncodingException In case of any encoding error.
+ */
+ void encode(E encodingObject, XMLStreamWriter writer) throws EncodingException;
+
+ /**
+ * Decodes an XML stream to an object.
+ *
+ * @param reader XML stream from where the encoded XML is read.
+ * @return Encoding object.
+ * @throws EncodingException In case of any encoding error.
+ */
+ E decode(XMLStreamReader reader) throws EncodingException;
+
+ /**
+ * Gets the qualified name of the XML fragment for the Encoding
+ * object.
+ *
+ * @return Qualified name of the XML fragment.
+ */
+ QName getEncodingObjectQName();
+
+ /**
+ * Returns the type of the encoding object.
+ *
+ * @return Encoding object type.
+ */
+ Class<E> getEncodingObjectType();
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingException.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingException.java
new file mode 100644
index 0000000000..76e4999ddc
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+@SuppressWarnings("serial")
+public class EncodingException extends Exception {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Initializes the exception message.
+ *
+ * @param message Message for the exception.
+ */
+ public EncodingException(String message) {
+ super(message);
+ }
+
+ /**
+ * Initializes the root cause.
+ *
+ * @param cause Root cause for the exception.
+ */
+ public EncodingException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingObject.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingObject.java
new file mode 100644
index 0000000000..54bda38033
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingObject.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public interface EncodingObject {
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingRegistry.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingRegistry.java
new file mode 100644
index 0000000000..c0a324d663
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingRegistry.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public interface EncodingRegistry {
+
+ /**
+ * Registers an en/de coder.
+ *
+ * @param <E> Encoding object type.
+ * @param encodingClass Encoding obejct class.
+ * @param qname Qualified name of the root element of the encoded XML.
+ * @param enDeCoder Encoding object enDeCoder.
+ */
+ <E extends EncodingObject> void registerEnDeCoder(Class<E> encodingClass, QName qname, EnDeCoder<E> enDeCoder);
+
+ <E extends EncodingObject> void unregisterEnDeCoder(Class<E> encodingClass, QName qname);
+
+ /**
+ * Encodes an object.
+ *
+ * @param encodingObject Encoding object to be encoded.
+ * @param writer Writer to which encoded information is written.
+ */
+ void encode(EncodingObject encodingObject, XMLStreamWriter writer) throws EncodingException;
+
+ /**
+ * Decodes an XML stream to an encoding object.
+ *
+ * @param reader Reader from which encoded information is read.
+ * @return Encoding object from the encoded stream.
+ */
+ EncodingObject decode(XMLStreamReader reader) throws EncodingException;
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingUtils.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingUtils.java
new file mode 100644
index 0000000000..8318892e0f
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+import org.apache.tuscany.sca.binding.notification.util.IOUtils.IOUtilsException;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class EncodingUtils {
+
+ private static XMLOutputFactory xof = XMLOutputFactory.newInstance();
+ private static XMLInputFactory xif = XMLInputFactory.newInstance();
+
+ public static void encodeToStream(EncodingRegistry encodingRegistry,
+ EncodingObject eo,
+ OutputStream os) throws IOUtilsException {
+ try {
+ XMLStreamWriter writer = xof.createXMLStreamWriter(os);
+ encodingRegistry.encode(eo, writer);
+ writer.flush();
+ writer.close();
+ }
+ catch(Exception e) {
+ throw new IOUtilsException(e);
+ }
+ }
+
+ public static EncodingObject decodeFromStream(EncodingRegistry encodingRegistry,
+ InputStream istream) throws EncodingException {
+ EncodingObject eo = null;
+ try {
+ XMLStreamReader reader = xif.createXMLStreamReader(istream);
+ reader.next();
+ eo = encodingRegistry.decode(reader);
+ reader.close();
+ }
+ catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+
+ return eo;
+ }
+
+ public static EndpointReference createEndpointReference(URL address, String brokerID) {
+ EndpointAddress epa = new EndpointAddress();
+ epa.setAddress(address);
+ EndpointReference epr = new EndpointReference();
+ epr.setEndpointAddress(epa);
+ if (brokerID != null) {
+ BrokerID bi = new BrokerID();
+ bi.setID(brokerID);
+ ReferenceProperties rp = new ReferenceProperties();
+ rp.addProperty(bi);
+ epr.setReferenceProperties(rp);
+ }
+ return epr;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumers.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumers.java
new file mode 100644
index 0000000000..aaa9ddb7b5
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumers.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class EndConsumers extends EndpointReferenceSequence {
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumersEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumersEnDeCoder.java
new file mode 100644
index 0000000000..26b8fd0357
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndConsumersEnDeCoder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class EndConsumersEnDeCoder extends EndpointReferenceSequenceEnDeCoder<EndConsumers> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.EndConsumers);
+
+ public EndConsumersEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+
+ public QName getEncodingObjectQName() {
+ return QNAME;
+ }
+
+
+ public Class<EndConsumers> getEncodingObjectType() {
+ return EndConsumers.class;
+ }
+
+ @Override
+ protected void encodeSequenceTypeAttribute(EndConsumers encodingObject, XMLStreamWriter writer) throws EncodingException {
+ try {
+ writer.writeAttribute(Constants.ConsumerSequenceType, encodingObject.getSequenceType());
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ @Override
+ protected String decodeSequenceTypeAttribute(XMLStreamReader reader) {
+ return reader.getAttributeValue(null, Constants.ConsumerSequenceType);
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducers.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducers.java
new file mode 100644
index 0000000000..b546f8c550
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducers.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class EndProducers extends EndpointReferenceSequence {
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducersEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducersEnDeCoder.java
new file mode 100644
index 0000000000..1039a3df9d
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndProducersEnDeCoder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class EndProducersEnDeCoder extends EndpointReferenceSequenceEnDeCoder<EndProducers> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.EndProducers);
+
+ public EndProducersEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+
+ public QName getEncodingObjectQName() {
+ return QNAME;
+ }
+
+
+ public Class<EndProducers> getEncodingObjectType() {
+ return EndProducers.class;
+ }
+
+ @Override
+ protected void encodeSequenceTypeAttribute(EndProducers encodingObject, XMLStreamWriter writer) throws EncodingException {
+ try {
+ writer.writeAttribute(Constants.ProducerSequenceType, encodingObject.getSequenceType());
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ @Override
+ protected String decodeSequenceTypeAttribute(XMLStreamReader reader) {
+ return reader.getAttributeValue(null, Constants.ProducerSequenceType);
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddress.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddress.java
new file mode 100644
index 0000000000..25eb336c05
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddress.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import java.net.URL;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class EndpointAddress implements EncodingObject {
+
+ private URL address;
+
+ public URL getAddress() {
+ return address;
+ }
+
+ public void setAddress(URL address) {
+ this.address = address;
+ }
+
+ public void setAddress(String addressText) throws EncodingException {
+ try {
+ this.address = new URL(addressText);
+ } catch(Exception e) {
+ throw new EncodingException(e);
+ }
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddressEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddressEnDeCoder.java
new file mode 100644
index 0000000000..76f2e8ec81
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointAddressEnDeCoder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class EndpointAddressEnDeCoder extends AbstractEnDeCoder<EndpointAddress> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.ADDRESSING_NS, Constants.Address);
+
+ public EndpointAddressEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(EndpointAddress encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ writer.writeStartElement(Constants.ADDRESSING_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI());
+ writer.writeNamespace(Constants.ADDRESSING_PREFIX, QNAME.getNamespaceURI());
+ writer.writeCharacters(encodingObject.getAddress().toString());
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public EndpointAddress decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ EndpointAddress endpointAddressElement = new EndpointAddress();
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ break;
+ case XMLStreamConstants.CHARACTERS:
+ if (reader.hasText()) {
+ String address = reader.getText();
+ endpointAddressElement.setAddress(address);
+ }
+ else {
+ throw new EncodingException("Endpoint address is missing address");
+ }
+ break;
+ case END_ELEMENT:
+ return endpointAddressElement;
+ }
+ }
+ } catch (XMLStreamException ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<EndpointAddress> getEncodingObjectType() {
+
+ return EndpointAddress.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReference.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReference.java
new file mode 100644
index 0000000000..fa73da9a19
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReference.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class EndpointReference implements EncodingObject {
+
+ private EndpointAddress endpointAddress;
+ private ReferenceProperties referenceProperties;
+
+ public EndpointAddress getEndpointAddress() {
+ return this.endpointAddress;
+ }
+
+ public void setEndpointAddress(EndpointAddress endpointAddress) {
+ this.endpointAddress = endpointAddress;
+ }
+
+ public ReferenceProperties getReferenceProperties() {
+ return this.referenceProperties;
+ }
+
+ public void setReferenceProperties(ReferenceProperties referenceProperties) {
+ this.referenceProperties = referenceProperties;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceEnDeCoder.java
new file mode 100644
index 0000000000..8a05440319
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceEnDeCoder.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class EndpointReferenceEnDeCoder extends AbstractEnDeCoder<EndpointReference> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.ADDRESSING_NS, Constants.EndpointReference);
+
+ public EndpointReferenceEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(EndpointReference encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ writer.writeStartElement(Constants.ADDRESSING_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI());
+ writer.writeNamespace(Constants.ADDRESSING_PREFIX, QNAME.getNamespaceURI());
+ registry.encode(encodingObject.getEndpointAddress(), writer);
+ if (encodingObject.getReferenceProperties() != null) {
+ registry.encode(encodingObject.getReferenceProperties(), writer);
+ }
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public EndpointReference decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ EndpointReference endpointReferenceElement = new EndpointReference();
+ boolean haveEPA = false;
+ boolean haveRP = false;
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ EncodingObject encodingObject = registry.decode(reader);
+ if (encodingObject instanceof EndpointAddress && !haveEPA) {
+ endpointReferenceElement.setEndpointAddress((EndpointAddress)encodingObject);
+ haveEPA = true;
+ }
+ else if(encodingObject instanceof ReferenceProperties && !haveRP) {
+ endpointReferenceElement.setReferenceProperties((ReferenceProperties)encodingObject);
+ haveRP = true;
+ }
+ else {
+ throw new EncodingException("Invalid encoding object");
+ }
+ break;
+ case END_ELEMENT:
+ if (!haveEPA) {
+ throw new EncodingException("Missing endpoint address");
+ }
+ return endpointReferenceElement;
+ }
+ }
+ } catch (Exception ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<EndpointReference> getEncodingObjectType() {
+
+ return EndpointReference.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequence.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequence.java
new file mode 100644
index 0000000000..39d53bca8d
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequence.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class EndpointReferenceSequence implements EncodingObject {
+
+ private List<EndpointReference> referenceSequence;
+ private String sequenceType;
+
+ public List<EndpointReference> getReferenceSequence() {
+ return referenceSequence;
+ }
+
+ public void addReferenceToSequence(EndpointReference address) {
+ if(this.referenceSequence == null) {
+ this.referenceSequence = new ArrayList<EndpointReference>();
+ }
+ this.referenceSequence.add(address);
+ }
+
+ public void setReferenceSequence(List<EndpointReference> referenceSequence) {
+ this.referenceSequence = referenceSequence;
+ }
+
+ public String getSequenceType() {
+ return this.sequenceType;
+ }
+
+ public void setSequenceType(String sequenceType) {
+ this.sequenceType = sequenceType;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequenceEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequenceEnDeCoder.java
new file mode 100644
index 0000000000..13d686b803
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceSequenceEnDeCoder.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public abstract class EndpointReferenceSequenceEnDeCoder<ERS extends EndpointReferenceSequence> extends AbstractEnDeCoder<ERS> {
+
+ public EndpointReferenceSequenceEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(ERS encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ QName qName = getEncodingObjectQName();
+ writer.writeStartElement(Constants.NOTIFICATION_PREFIX, qName.getLocalPart(), qName.getNamespaceURI());
+ writer.writeNamespace(Constants.NOTIFICATION_PREFIX, qName.getNamespaceURI());
+ encodeSequenceTypeAttribute(encodingObject, writer);
+ if (encodingObject.getReferenceSequence() != null) {
+ for (EndpointReference endpointReference : encodingObject.getReferenceSequence()) {
+ registry.encode(endpointReference, writer);
+ }
+ }
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ protected abstract void encodeSequenceTypeAttribute(ERS encodingObject, XMLStreamWriter writer) throws EncodingException;
+
+ public ERS decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ ERS endpointReferenceSequenceElement = null;
+ try {
+ endpointReferenceSequenceElement = getEncodingObjectType().newInstance();
+ } catch(Exception e) {
+ throw new EncodingException(e);
+ }
+ String sequenceType = decodeSequenceTypeAttribute(reader);
+ endpointReferenceSequenceElement.setSequenceType(sequenceType);
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ EncodingObject encodingObject = registry.decode(reader);
+ endpointReferenceSequenceElement.addReferenceToSequence((EndpointReference)encodingObject);
+ break;
+ case END_ELEMENT:
+ return endpointReferenceSequenceElement;
+ }
+ }
+ } catch (XMLStreamException ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+ protected abstract String decodeSequenceTypeAttribute(XMLStreamReader reader);
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapper.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapper.java
new file mode 100644
index 0000000000..f1f1a7737b
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class EndpointReferenceWrapper implements EncodingObject {
+
+ private EndpointReference reference;
+
+ public EndpointReference getReference() {
+ return reference;
+ }
+
+ public void setReference(EndpointReference reference) {
+ this.reference = reference;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapperEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapperEnDeCoder.java
new file mode 100644
index 0000000000..aeafce46e0
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/EndpointReferenceWrapperEnDeCoder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public abstract class EndpointReferenceWrapperEnDeCoder<ERW extends EndpointReferenceWrapper> extends AbstractEnDeCoder<ERW> {
+
+ public EndpointReferenceWrapperEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(ERW encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ QName qName = getEncodingObjectQName();
+ writer.writeStartElement(Constants.NOTIFICATION_PREFIX, qName.getLocalPart(), qName.getNamespaceURI());
+ writer.writeNamespace(Constants.NOTIFICATION_PREFIX, qName.getNamespaceURI());
+ registry.encode(encodingObject.getReference(), writer);
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public ERW decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ ERW endpointReferenceWrapperElement = null;
+ try {
+ endpointReferenceWrapperElement = getEncodingObjectType().newInstance();
+ } catch(Exception e) {
+ throw new EncodingException(e);
+ }
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ EncodingObject encodingObject = registry.decode(reader);
+ endpointReferenceWrapperElement.setReference((EndpointReference)encodingObject);
+ break;
+ case END_ELEMENT:
+ return endpointReferenceWrapperElement;
+ }
+ }
+ } catch (XMLStreamException ex) {
+ throw new EncodingException(ex);
+ }
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumers.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumers.java
new file mode 100644
index 0000000000..5871d370fb
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumers.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NeighborBrokerConsumers extends EndpointReferenceSequence {
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumersEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumersEnDeCoder.java
new file mode 100644
index 0000000000..ba52687e16
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborBrokerConsumersEnDeCoder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NeighborBrokerConsumersEnDeCoder extends EndpointReferenceSequenceEnDeCoder<NeighborBrokerConsumers> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NeighborBrokerConsumers);
+
+ public NeighborBrokerConsumersEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+
+ public QName getEncodingObjectQName() {
+ return QNAME;
+ }
+
+
+ public Class<NeighborBrokerConsumers> getEncodingObjectType() {
+ return NeighborBrokerConsumers.class;
+ }
+
+ @Override
+ protected void encodeSequenceTypeAttribute(NeighborBrokerConsumers encodingObject, XMLStreamWriter writer) throws EncodingException {
+ try {
+ writer.writeAttribute(Constants.ConsumerSequenceType, encodingObject.getSequenceType());
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ @Override
+ protected String decodeSequenceTypeAttribute(XMLStreamReader reader) {
+ return reader.getAttributeValue(null, Constants.ConsumerSequenceType);
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Neighbors.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Neighbors.java
new file mode 100644
index 0000000000..0aec2da465
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Neighbors.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class Neighbors implements EncodingObject {
+
+ private List<Broker> brokerSequence;
+
+ public List<Broker> getBrokerSequence() {
+ return brokerSequence;
+ }
+
+ public void addBrokerToSequence(Broker broker) {
+ if(this.brokerSequence == null) {
+ this.brokerSequence = new ArrayList<Broker>();
+ }
+ this.brokerSequence.add(broker);
+ }
+
+ public void setBrokerSequence(List<Broker> brokerSequence) {
+ this.brokerSequence = brokerSequence;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborsEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborsEnDeCoder.java
new file mode 100644
index 0000000000..53f56b6626
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NeighborsEnDeCoder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NeighborsEnDeCoder extends AbstractEnDeCoder<Neighbors> {
+
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.Neighbors);
+
+ public NeighborsEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(Neighbors encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ QName qName = getEncodingObjectQName();
+ writer.writeStartElement(Constants.NOTIFICATION_PREFIX, qName.getLocalPart(), qName.getNamespaceURI());
+ writer.writeNamespace(Constants.NOTIFICATION_PREFIX, qName.getNamespaceURI());
+ if (encodingObject.getBrokerSequence() != null) {
+ for (Broker broker : encodingObject.getBrokerSequence()) {
+ registry.encode(broker, writer);
+ }
+ }
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public Neighbors decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ Neighbors neighborsElement = new Neighbors();
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ EncodingObject encodingObject = registry.decode(reader);
+ neighborsElement.addBrokerToSequence((Broker)encodingObject);
+ break;
+ case END_ELEMENT:
+ return neighborsElement;
+ }
+ }
+ } catch (XMLStreamException ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<Neighbors> getEncodingObjectType() {
+
+ return Neighbors.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBroker.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBroker.java
new file mode 100644
index 0000000000..1684d0c3ee
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBroker.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewBroker extends AbstractBroker {
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAck.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAck.java
new file mode 100644
index 0000000000..f4702806e0
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAck.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewBrokerAck implements EncodingObject {
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAckEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAckEnDeCoder.java
new file mode 100644
index 0000000000..fcd0717688
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerAckEnDeCoder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewBrokerAckEnDeCoder extends AbstractEnDeCoder<NewBrokerAck> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewBrokerAck);
+
+ public NewBrokerAckEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(NewBrokerAck encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI());
+ writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI());
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public NewBrokerAck decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ NewBrokerAck newBrokerAck = new NewBrokerAck();
+ while (true) {
+ switch (reader.next()) {
+ case END_ELEMENT:
+ return newBrokerAck;
+ }
+ }
+ } catch (Exception ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<NewBrokerAck> getEncodingObjectType() {
+
+ return NewBrokerAck.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerEnDeCoder.java
new file mode 100644
index 0000000000..db65ba7491
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerEnDeCoder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewBrokerEnDeCoder extends AbstractBrokerEnDeCoder<NewBroker> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewBroker);
+
+ public NewBrokerEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<NewBroker> getEncodingObjectType() {
+
+ return NewBroker.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponse.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponse.java
new file mode 100644
index 0000000000..72aa20af3d
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponse.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewBrokerResponse implements EncodingObject {
+
+ private EndProducers endProducers;
+ private EndConsumers endConsumers;
+ private Brokers brokers;
+ private boolean firstBroker;
+
+ public EndProducers getEndProducers() {
+ return this.endProducers;
+ }
+
+ public void setEndProducers(EndProducers endProducers) {
+ this.endProducers = endProducers;
+ }
+
+ public EndConsumers getEndConsumers() {
+ return this.endConsumers;
+ }
+
+ public void setEndConsumers(EndConsumers endConsumers) {
+ this.endConsumers = endConsumers;
+ }
+
+ public Brokers getBrokers() {
+ return this.brokers;
+ }
+
+ public void setBrokers(Brokers brokers) {
+ this.brokers = brokers;
+ }
+
+ public boolean isFirstBroker() {
+ return this.firstBroker;
+ }
+
+ public void setFirstBroker(boolean firstBroker) {
+ this.firstBroker = firstBroker;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponseEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponseEnDeCoder.java
new file mode 100644
index 0000000000..391820444f
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewBrokerResponseEnDeCoder.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewBrokerResponseEnDeCoder extends AbstractEnDeCoder<NewBrokerResponse> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewBrokerResponse);
+
+ public NewBrokerResponseEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(NewBrokerResponse encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ QName qName = getEncodingObjectQName();
+ writer.writeStartElement(Constants.NOTIFICATION_PREFIX, qName.getLocalPart(), qName.getNamespaceURI());
+ writer.writeNamespace(Constants.NOTIFICATION_PREFIX, qName.getNamespaceURI());
+ writer.writeAttribute(Constants.FirstBroker, String.valueOf(encodingObject.isFirstBroker()));
+ if (encodingObject.isFirstBroker()) {
+ registry.encode(encodingObject.getEndConsumers(), writer);
+ registry.encode(encodingObject.getEndProducers(), writer);
+ }
+ else {
+ registry.encode(encodingObject.getBrokers(), writer);
+ }
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public NewBrokerResponse decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ NewBrokerResponse newBrokerResponseElement = new NewBrokerResponse();
+ boolean firstBroker = Boolean.parseBoolean(reader.getAttributeValue(null, Constants.FirstBroker));
+ newBrokerResponseElement.setFirstBroker(firstBroker);
+ boolean haveEC = false;
+ boolean haveEP = false;
+ boolean haveB = false;
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ EncodingObject encodingObject = registry.decode(reader);
+ if (encodingObject instanceof EndProducers && !haveEP && firstBroker) {
+ newBrokerResponseElement.setEndProducers((EndProducers)encodingObject);
+ haveEP = true;
+ }
+ else if(encodingObject instanceof EndConsumers && !haveEC && firstBroker) {
+ newBrokerResponseElement.setEndConsumers((EndConsumers)encodingObject);
+ haveEC = true;
+ }
+ else if(encodingObject instanceof Brokers && !haveB && !firstBroker) {
+ newBrokerResponseElement.setBrokers((Brokers)encodingObject);
+ haveB = true;
+ }
+ else {
+ throw new EncodingException("Invalid encoding object");
+ }
+ break;
+ case END_ELEMENT:
+ if (!haveEP && firstBroker) {
+ throw new EncodingException("Missing end producers");
+ }
+ if (!haveEC && firstBroker) {
+ throw new EncodingException("Missing end consumers");
+ }
+ if (!haveB && !firstBroker) {
+ throw new EncodingException("Missing brokers");
+ }
+ return newBrokerResponseElement;
+ }
+ }
+ } catch (Exception ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<NewBrokerResponse> getEncodingObjectType() {
+
+ return NewBrokerResponse.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumer.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumer.java
new file mode 100644
index 0000000000..2c0c846841
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewConsumer extends EndpointReferenceWrapper {
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerEnDeCoder.java
new file mode 100644
index 0000000000..8314d628ba
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerEnDeCoder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewConsumerEnDeCoder extends EndpointReferenceWrapperEnDeCoder<NewConsumer> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewConsumer);
+
+ public NewConsumerEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+
+ public QName getEncodingObjectQName() {
+ return QNAME;
+ }
+
+
+ public Class<NewConsumer> getEncodingObjectType() {
+ return NewConsumer.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponse.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponse.java
new file mode 100644
index 0000000000..b89bfa2470
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponse.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewConsumerResponse extends EndpointReferenceSequence {
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponseEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponseEnDeCoder.java
new file mode 100644
index 0000000000..1abb187271
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewConsumerResponseEnDeCoder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewConsumerResponseEnDeCoder extends EndpointReferenceSequenceEnDeCoder<NewConsumerResponse> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewConsumerResponse);
+
+ public NewConsumerResponseEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+
+ public QName getEncodingObjectQName() {
+ return QNAME;
+ }
+
+
+ public Class<NewConsumerResponse> getEncodingObjectType() {
+ return NewConsumerResponse.class;
+ }
+
+ @Override
+ protected void encodeSequenceTypeAttribute(NewConsumerResponse encodingObject, XMLStreamWriter writer) throws EncodingException {
+ try {
+ writer.writeAttribute(Constants.ProducerSequenceType, encodingObject.getSequenceType());
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ @Override
+ protected String decodeSequenceTypeAttribute(XMLStreamReader reader) {
+ return reader.getAttributeValue(null, Constants.ProducerSequenceType);
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducer.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducer.java
new file mode 100644
index 0000000000..f12a4582dd
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewProducer extends EndpointReferenceWrapper {
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerEnDeCoder.java
new file mode 100644
index 0000000000..75ee8aea6f
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerEnDeCoder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewProducerEnDeCoder extends EndpointReferenceWrapperEnDeCoder<NewProducer> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewProducer);
+
+ public NewProducerEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+
+ public QName getEncodingObjectQName() {
+ return QNAME;
+ }
+
+
+ public Class<NewProducer> getEncodingObjectType() {
+ return NewProducer.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponse.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponse.java
new file mode 100644
index 0000000000..91bf371549
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponse.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewProducerResponse extends EndpointReferenceSequence {
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponseEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponseEnDeCoder.java
new file mode 100644
index 0000000000..34dee40761
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/NewProducerResponseEnDeCoder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class NewProducerResponseEnDeCoder extends EndpointReferenceSequenceEnDeCoder<NewProducerResponse> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.NewProducerResponse);
+
+ public NewProducerResponseEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+
+ public QName getEncodingObjectQName() {
+ return QNAME;
+ }
+
+
+ public Class<NewProducerResponse> getEncodingObjectType() {
+ return NewProducerResponse.class;
+ }
+
+ @Override
+ protected void encodeSequenceTypeAttribute(NewProducerResponse encodingObject, XMLStreamWriter writer) throws EncodingException {
+ try {
+ writer.writeAttribute(Constants.ConsumerSequenceType, encodingObject.getSequenceType());
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ @Override
+ protected String decodeSequenceTypeAttribute(XMLStreamReader reader) {
+ return reader.getAttributeValue(null, Constants.ConsumerSequenceType);
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferenceProperties.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferenceProperties.java
new file mode 100644
index 0000000000..0f661717c0
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferenceProperties.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class ReferenceProperties implements EncodingObject {
+
+ private List<EncodingObject> properties;
+
+ public List<EncodingObject> getProperties() {
+ return properties;
+ }
+
+ public void addProperty(EncodingObject property) {
+ if(this.properties == null) {
+ this.properties = new ArrayList<EncodingObject>();
+ }
+ this.properties.add(property);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <E extends EncodingObject> E getProperty(Class<E> propertyType) {
+ if (this.properties == null) {
+ return null;
+ }
+ for (EncodingObject eo : properties) {
+ if (propertyType.isInstance(eo)) {
+ return (E)eo;
+ }
+ }
+ return null;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferencePropertiesEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferencePropertiesEnDeCoder.java
new file mode 100644
index 0000000000..6ffbd8724e
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReferencePropertiesEnDeCoder.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class ReferencePropertiesEnDeCoder extends AbstractEnDeCoder<ReferenceProperties> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.ADDRESSING_NS, Constants.ReferenceProperties);
+
+ public ReferencePropertiesEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(ReferenceProperties encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ writer.writeStartElement(Constants.ADDRESSING_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI());
+ writer.writeNamespace(Constants.ADDRESSING_PREFIX, QNAME.getNamespaceURI());
+ if (encodingObject.getProperties() != null) {
+ for (EncodingObject property : encodingObject.getProperties()) {
+ registry.encode(property, writer);
+ }
+ }
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public ReferenceProperties decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ ReferenceProperties referencePropertiesElement = new ReferenceProperties();
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ EncodingObject property = registry.decode(reader);
+ referencePropertiesElement.addProperty(property);
+ break;
+ case END_ELEMENT:
+ return referencePropertiesElement;
+ }
+ }
+ } catch (XMLStreamException ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<ReferenceProperties> getEncodingObjectType() {
+
+ return ReferenceProperties.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBroker.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBroker.java
new file mode 100644
index 0000000000..0047f2b6cb
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBroker.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class RemoveBroker implements EncodingObject {
+
+ private BrokerConsumerReference brokerConsumerReference;
+ private NeighborBrokerConsumers neighborBrokerConsumers;
+
+ public BrokerConsumerReference getBrokerConsumerReference() {
+ return this.brokerConsumerReference;
+ }
+
+ public void setBrokerConsumerReference(BrokerConsumerReference brokerConsumerReference) {
+ this.brokerConsumerReference = brokerConsumerReference;
+ }
+
+ public NeighborBrokerConsumers getNeighborBrokerConsumers() {
+ return this.neighborBrokerConsumers;
+ }
+
+ public void setNeighborBrokerConsumers(NeighborBrokerConsumers neighborBrokerConsumers) {
+ this.neighborBrokerConsumers = neighborBrokerConsumers;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBrokerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBrokerEnDeCoder.java
new file mode 100644
index 0000000000..56e6c361c3
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemoveBrokerEnDeCoder.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class RemoveBrokerEnDeCoder extends AbstractEnDeCoder<RemoveBroker> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.RemoveBroker);
+
+ public RemoveBrokerEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(RemoveBroker encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI());
+ writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI());
+ registry.encode(encodingObject.getBrokerConsumerReference(), writer);
+ if (encodingObject.getNeighborBrokerConsumers() != null) {
+ registry.encode(encodingObject.getNeighborBrokerConsumers(), writer);
+ }
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public RemoveBroker decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ RemoveBroker removeBrokerElement = new RemoveBroker();
+ boolean haveBCR = false;
+ boolean haveNBC = false;
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ EncodingObject encodingObject = registry.decode(reader);
+ if (encodingObject instanceof BrokerConsumerReference && !haveBCR) {
+ removeBrokerElement.setBrokerConsumerReference((BrokerConsumerReference)encodingObject);
+ haveBCR = true;
+ }
+ else if(encodingObject instanceof NeighborBrokerConsumers && !haveNBC) {
+ removeBrokerElement.setNeighborBrokerConsumers((NeighborBrokerConsumers)encodingObject);
+ haveNBC = true;
+ }
+ else {
+ throw new EncodingException("Invalid encoding object");
+ }
+ break;
+ case END_ELEMENT:
+ if (!haveBCR) {
+ throw new EncodingException("Missing broker consumer reference");
+ }
+ return removeBrokerElement;
+ }
+ }
+ } catch (Exception ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<RemoveBroker> getEncodingObjectType() {
+
+ return RemoveBroker.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBroker.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBroker.java
new file mode 100644
index 0000000000..40795987d0
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBroker.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class RemovedBroker extends EndpointReferenceWrapper {
+
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBrokerEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBrokerEnDeCoder.java
new file mode 100644
index 0000000000..3f0ddc546c
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/RemovedBrokerEnDeCoder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import javax.xml.namespace.QName;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class RemovedBrokerEnDeCoder extends EndpointReferenceWrapperEnDeCoder<RemovedBroker> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.RemovedBroker);
+
+ public RemovedBrokerEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+
+ public QName getEncodingObjectQName() {
+ return QNAME;
+ }
+
+
+ public Class<RemovedBroker> getEncodingObjectType() {
+ return RemovedBroker.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnection.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnection.java
new file mode 100644
index 0000000000..68bc368a53
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnection.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class ReplaceBrokerConnection implements EncodingObject {
+
+ private RemovedBroker removedBroker;
+ private Neighbors neighbors;
+
+ public RemovedBroker getRemovedBroker() {
+ return this.removedBroker;
+ }
+
+ public void setRemovedBroker(RemovedBroker removedBroker) {
+ this.removedBroker = removedBroker;
+ }
+
+ public Neighbors getNeighbors() {
+ return this.neighbors;
+ }
+
+ public void setNeighbors(Neighbors neighbors) {
+ this.neighbors = neighbors;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnectionEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnectionEnDeCoder.java
new file mode 100644
index 0000000000..057d686905
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/ReplaceBrokerConnectionEnDeCoder.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class ReplaceBrokerConnectionEnDeCoder extends AbstractEnDeCoder<ReplaceBrokerConnection> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.ReplaceBrokerConnection);
+
+ public ReplaceBrokerConnectionEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(ReplaceBrokerConnection encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI());
+ writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI());
+ registry.encode(encodingObject.getRemovedBroker(), writer);
+ if (encodingObject.getNeighbors() != null) {
+ registry.encode(encodingObject.getNeighbors(), writer);
+ }
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public ReplaceBrokerConnection decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ ReplaceBrokerConnection replaceBrokerConnectionElement = new ReplaceBrokerConnection();
+ boolean haveRB = false;
+ boolean haveN = false;
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ EncodingObject encodingObject = registry.decode(reader);
+ if (encodingObject instanceof RemovedBroker && !haveRB) {
+ replaceBrokerConnectionElement.setRemovedBroker((RemovedBroker)encodingObject);
+ haveRB = true;
+ }
+ else if(encodingObject instanceof Neighbors && !haveN) {
+ replaceBrokerConnectionElement.setNeighbors((Neighbors)encodingObject);
+ haveN = true;
+ }
+ else {
+ throw new EncodingException("Invalid encoding object");
+ }
+ break;
+ case END_ELEMENT:
+ if (!haveRB) {
+ throw new EncodingException("Missing removed broker");
+ }
+ return replaceBrokerConnectionElement;
+ }
+ }
+ } catch (Exception ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<ReplaceBrokerConnection> getEncodingObjectType() {
+
+ return ReplaceBrokerConnection.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Subscribe.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Subscribe.java
new file mode 100644
index 0000000000..d87c9d4eb5
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/Subscribe.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class Subscribe implements EncodingObject {
+
+ private ConsumerReference consumerReference;
+
+ public ConsumerReference getConsumerReference() {
+ return consumerReference;
+ }
+
+ public void setConsumerReference(ConsumerReference consumerReference) {
+ this.consumerReference = consumerReference;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/SubscribeEnDeCoder.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/SubscribeEnDeCoder.java
new file mode 100644
index 0000000000..8f5d368e05
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/encoding/SubscribeEnDeCoder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import static javax.xml.stream.XMLStreamConstants.END_ELEMENT;
+import static javax.xml.stream.XMLStreamConstants.START_ELEMENT;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class SubscribeEnDeCoder extends AbstractEnDeCoder<Subscribe> {
+
+ // QName for the root element
+ public static final QName QNAME = new QName(Constants.NOTIFICATION_NS, Constants.Subscribe);
+
+ public SubscribeEnDeCoder(EncodingRegistry registry) {
+ super(registry);
+ }
+
+ public void encode(Subscribe encodingObject, XMLStreamWriter writer) throws EncodingException {
+
+ try {
+ writer.writeStartElement(Constants.NOTIFICATION_PREFIX, QNAME.getLocalPart(), QNAME.getNamespaceURI());
+ writer.writeNamespace(Constants.NOTIFICATION_PREFIX, QNAME.getNamespaceURI());
+ registry.encode(encodingObject.getConsumerReference(), writer);
+ writer.writeEndElement();
+ } catch(XMLStreamException e) {
+ throw new EncodingException(e);
+ }
+ }
+
+ public Subscribe decode(XMLStreamReader reader) throws EncodingException {
+
+ try {
+ Subscribe subscribeElement = new Subscribe();
+ while (true) {
+ switch (reader.next()) {
+ case START_ELEMENT:
+ EncodingObject encodingObject = registry.decode(reader);
+ subscribeElement.setConsumerReference((ConsumerReference)encodingObject);
+ break;
+ case END_ELEMENT:
+ return subscribeElement;
+ }
+ }
+ } catch (XMLStreamException ex) {
+ throw new EncodingException(ex);
+ }
+ }
+
+
+ public QName getEncodingObjectQName() {
+
+ return QNAME;
+ }
+
+
+ public Class<Subscribe> getEncodingObjectType() {
+
+ return Subscribe.class;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/IOUtils.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/IOUtils.java
new file mode 100644
index 0000000000..a45c6807e6
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/IOUtils.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class IOUtils {
+
+ // FIXME: For some reason, tomcat converts the header names to be lower case, see TUSCANY-1791
+ public static final String Notification_Source = "notification-source";
+ public static final String Notification_Target = "notification-target";
+ public static final String Notification_Operation = "notification-operation";
+
+ public static final int DEF_BLOCK_SIZE = 512;
+
+ public static Object sendHttpRequest(URL targetURL,
+ String opName,
+ Writeable wbody,
+ ReadableContinuation rcont) throws Exception {
+ if (opName == null) {
+ opName = "";
+ }
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put(Notification_Operation, opName);
+ return sendHttpRequest(targetURL, headers, wbody, rcont);
+ }
+
+ public static Object sendHttpRequest(URL targetURL,
+ Map<String, String> headers,
+ Writeable wbody,
+ ReadableContinuation rcont) throws Exception {
+
+ String targetUri = targetURL.toString();
+ String sourceUri = "";
+
+ final HttpURLConnection con = (HttpURLConnection) targetURL.openConnection();
+ con.setRequestMethod("POST");
+ //con.setRequestProperty("Content-Length", Integer.toString(sbody.getBytes().length));
+ con.setAllowUserInteraction(false);
+ con.setInstanceFollowRedirects(false);
+ if (targetUri != null) {
+ con.setRequestProperty(Notification_Target, targetUri);
+ }
+
+ if (sourceUri != null) {
+ con.setRequestProperty(Notification_Source, sourceUri);
+ }
+
+ for (String key : headers.keySet()) {
+ con.setRequestProperty(key, headers.get(key));
+ }
+ con.setDoOutput(true);
+ con.setDoInput(true);
+ con.connect();
+ Object response = null;
+ try {
+ if (wbody != null) {
+ OutputStream ost = con.getOutputStream();
+ wbody.write(ost);
+ }
+ else {
+ throw new IOUtilsException("Missing writeable body");
+ }
+ final int rc = con.getResponseCode();
+ switch (rc) {
+ case HttpURLConnection.HTTP_OK:
+ if (rcont != null) {
+ InputStream ist = con.getInputStream();
+ response = rcont.read(ist);
+ }
+ break;
+ case HttpURLConnection.HTTP_NO_CONTENT:
+ break;
+ default:
+ throw new RuntimeException("Unexpected response code: " + rc);
+ }
+ }
+ finally
+ {
+ con.disconnect();
+ }
+ return response;
+ }
+
+ public interface Writeable {
+ void write(OutputStream os) throws IOUtilsException;
+ }
+
+ public interface ReadableContinuation {
+ Object read(InputStream is) throws IOUtilsException;
+ }
+
+ @SuppressWarnings("serial")
+ public static class IOUtilsException extends Exception {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public IOUtilsException(String message) {
+ super(message);
+ }
+
+ public IOUtilsException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ public static byte [] readFully(final InputStream ist, int len) throws IOException {
+ ByteArrayOutputStream baost = new ByteArrayOutputStream();
+ copyStream(ist,baost,len);
+ return baost.toByteArray();
+ }
+
+ public static int copyStream(final InputStream ist, final OutputStream ost) throws IOException {
+ return copyStream(ist, ost, -1, 0);
+ }
+
+ public static int copyStream(final InputStream ist, final OutputStream ost, int length) throws IOException {
+ return copyStream(ist, ost, length, 0);
+ }
+
+ public static int copyStream(final InputStream ist, final OutputStream ost, final int length, int blockSize) throws IOException {
+
+ int cbCopied = 0;
+ if (blockSize <= 0) {
+ blockSize = DEF_BLOCK_SIZE;
+ }
+
+ final byte[] block = new byte[blockSize];
+ boolean done = length == 0;
+ while (!done) {
+ try {
+ // determine how many bytes to read
+ final int cbToRead = length == -1 ? block.length : (Math.min(length - cbCopied, block.length));
+ final int cbRead = ist.read(block, 0, cbToRead);
+ if (cbRead == -1) {
+ done = true;
+ }
+ else {
+ ost.write(block, 0, cbRead);
+ cbCopied += cbRead;
+ done = cbCopied == length;
+ }
+ } catch (final EOFException e) {
+ done = true;
+ }
+ }
+ ost.flush();
+ return cbCopied;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/NotificationServlet.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/NotificationServlet.java
new file mode 100644
index 0000000000..046914e9db
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/NotificationServlet.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.util;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+
+/**
+ * Receives notification in HTTP request and dispatches it down the wire
+ *
+ * @version $Rev$ $Date$
+ */
+@SuppressWarnings("serial")
+public class NotificationServlet extends HttpServlet {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private NotificationServletHandler handler;
+ private NotificationServletStreamHandler servletStreamHandler;
+
+ public NotificationServlet(NotificationServletHandler handler) {
+ this.handler = handler;
+ this.servletStreamHandler = null;
+ }
+
+ public NotificationServlet(NotificationServletStreamHandler servletStreamHandler) {
+ this.handler = null;
+ this.servletStreamHandler = servletStreamHandler;
+ }
+
+ @Override
+ public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ HashMap<String, String> headers = new HashMap<String, String>();
+ Enumeration headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String headerName = (String)headerNames.nextElement();
+ headers.put(headerName, request.getHeader(headerName));
+ }
+ if (handler != null) {
+ byte[] requestBody = IOUtils.readFully(request.getInputStream(), request.getContentLength());
+ byte[] handlersResponse = handler.handle(headers, requestBody);
+ if (handlersResponse != null) {
+ response.getOutputStream().write(handlersResponse);
+ response.getOutputStream().flush();
+ }
+ }
+ else {
+ try {
+ servletStreamHandler.handle(headers, request.getInputStream(), request.getContentLength(), response.getOutputStream());
+ }
+ catch(RuntimeException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public interface NotificationServletHandler {
+ public byte[] handle(Map<String, String> headers, byte[] payload);
+ }
+
+ public interface NotificationServletStreamHandler {
+ public void handle(Map<String, String> headers, ServletInputStream istream, int contentLength, ServletOutputStream ostream);
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/URIUtil.java b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/URIUtil.java
new file mode 100644
index 0000000000..a7f26003b0
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/java/org/apache/tuscany/sca/binding/notification/util/URIUtil.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.util;
+
+import java.net.URI;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class URIUtil {
+
+ public static String getPath(URI uri) {
+ String path = null;
+
+ if (uri.isOpaque()) {
+ path = "/" + uri.getSchemeSpecificPart();
+ }
+ else if (uri.isAbsolute()) {
+ path = uri.getPath();
+ } else {
+ path = "/" + uri.getPath();
+ }
+
+ return path;
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/main/resources/META-INF/services/org.apache.tuscany.sca.core.ModuleActivator b/branches/sca-java-1.2.1/modules/binding-notification/src/main/resources/META-INF/services/org.apache.tuscany.sca.core.ModuleActivator
new file mode 100644
index 0000000000..5e5ce97054
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/main/resources/META-INF/services/org.apache.tuscany.sca.core.ModuleActivator
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# Implementation class for the ExtensionActivator
+org.apache.tuscany.sca.binding.notification.NotificationBindingModuleActivator
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/AxiomTestCase.java b/branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/AxiomTestCase.java
new file mode 100644
index 0000000000..a7e864cb55
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/AxiomTestCase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import java.io.ByteArrayInputStream;
+import java.io.StringWriter;
+
+import junit.framework.TestCase;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.junit.Assert;
+
+public class AxiomTestCase extends TestCase {
+
+ private static String wsnt = "http://docs.oasis-open.org/wsn/b-2";
+ private static String wsa = "http://schemas.xmlsoap.org/ws/2004/08/addressing";
+ private static String testUrl1 = "http://localhost:8081/test";
+ private static String testUrl2 = "http://localhost:8082/test";
+ private static String testNewProducerResponse =
+ "<wsnt:NewProducerResponse xmlns:wsnt=\"" + wsnt + "\" ConsumerSequenceType=\"EndConsumers\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" +
+ "</wsnt:NewProducerResponse>";
+
+ public void testAxiom() {
+ try {
+ StAXOMBuilder builder = new StAXOMBuilder(new ByteArrayInputStream(testNewProducerResponse.getBytes()));
+ OMElement element = builder.getDocumentElement();
+ Assert.assertNotNull(element);
+
+ StringWriter sw = new StringWriter();
+ element.serialize(sw);
+ sw.flush();
+ Assert.assertEquals(sw.toString(),testNewProducerResponse);
+ }
+ catch(Throwable e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingTestCase.java b/branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingTestCase.java
new file mode 100644
index 0000000000..4f51175407
--- /dev/null
+++ b/branches/sca-java-1.2.1/modules/binding-notification/src/test/java/org/apache/tuscany/sca/binding/notification/encoding/EncodingTestCase.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.notification.encoding;
+
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.Iterator;
+
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class EncodingTestCase extends TestCase {
+
+ private static String wsnt = "http://docs.oasis-open.org/wsn/b-2";
+ private static String wsa = "http://schemas.xmlsoap.org/ws/2004/08/addressing";
+ private static String testUrl = "http://localhost:8080/test";
+ private static String testUrl1 = "http://localhost:8081/test";
+ private static String testUrl2 = "http://localhost:8082/test";
+ private static String bid1 = "UUID1";
+ private static String bid2 = "UUID2";
+ private static String testSubscribe =
+ "<wsnt:Subscribe xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsnt:ConsumerReference xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl + "</wsa:Address>" +
+ "</wsa:EndpointReference>" +
+ "</wsnt:ConsumerReference>" +
+ "</wsnt:Subscribe>";
+ private static String testNewConsumerResponse =
+ "<wsnt:NewConsumerResponse xmlns:wsnt=\"" + wsnt + "\" ProducerSequenceType=\"EndProducers\">" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl + "</wsa:Address>" +
+ "</wsa:EndpointReference>" +
+ "</wsnt:NewConsumerResponse>";
+ private static String testNewProducerResponse =
+ "<wsnt:NewProducerResponse xmlns:wsnt=\"" + wsnt + "\" ConsumerSequenceType=\"EndConsumers\">" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" +
+ "</wsa:EndpointReference>" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" +
+ "</wsa:EndpointReference>" +
+ "</wsnt:NewProducerResponse>";
+ private static String testNoProducersResponse =
+ "<wsnt:NewConsumerResponse xmlns:wsnt=\"" + wsnt + "\" ProducerSequenceType=\"NoProducers\" />";
+ private static String testNewBroker =
+ "<wsnt:NewBroker xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsnt:BrokerConsumerReference xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" +
+ "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" +
+ "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid1 + "</wsnt:BrokerID>" +
+ "</wsa:ReferenceProperties>" +
+ "</wsa:EndpointReference>" +
+ "</wsnt:BrokerConsumerReference>" +
+ "<wsnt:BrokerProducerReference xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" +
+ "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" +
+ "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid2 + "</wsnt:BrokerID>" +
+ "</wsa:ReferenceProperties>" +
+ "</wsa:EndpointReference>" +
+ "</wsnt:BrokerProducerReference>" +
+ "</wsnt:NewBroker>";
+ private static String testNewBrokerResponse1 =
+ "<wsnt:NewBrokerResponse xmlns:wsnt=\"" + wsnt + "\" FirstBroker=\"true\">" +
+ "<wsnt:EndConsumers xmlns:wsnt=\"" + wsnt + "\" ConsumerSequenceType=\"EndConsumers\">" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" +
+ "</wsa:EndpointReference>" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" +
+ "</wsa:EndpointReference>" +
+ "</wsnt:EndConsumers>" +
+ "<wsnt:EndProducers xmlns:wsnt=\"" + wsnt + "\" ProducerSequenceType=\"NoProducers\" />" +
+ "</wsnt:NewBrokerResponse>";
+ private static String testNewBrokerResponse2 =
+ "<wsnt:NewBrokerResponse xmlns:wsnt=\"" + wsnt + "\" FirstBroker=\"false\">" +
+ "<wsnt:Brokers xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsnt:Broker xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsnt:BrokerConsumerReference xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" +
+ "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" +
+ "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid1 + "</wsnt:BrokerID>" +
+ "</wsa:ReferenceProperties>" +
+ "</wsa:EndpointReference>" +
+ "</wsnt:BrokerConsumerReference>" +
+ "<wsnt:BrokerProducerReference xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" +
+ "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" +
+ "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid2 + "</wsnt:BrokerID>" +
+ "</wsa:ReferenceProperties>" +
+ "</wsa:EndpointReference>" +
+ "</wsnt:BrokerProducerReference>" +
+ "</wsnt:Broker>" +
+ "</wsnt:Brokers>" +
+ "</wsnt:NewBrokerResponse>";
+ private static String testRemoveBroker =
+ "<wsnt:RemoveBroker xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsnt:BrokerConsumerReference xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl + "</wsa:Address>" +
+ "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" +
+ "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid1 + "</wsnt:BrokerID>" +
+ "</wsa:ReferenceProperties>" +
+ "</wsa:EndpointReference>" +
+ "</wsnt:BrokerConsumerReference>" +
+ "<wsnt:NeighborBrokerConsumers xmlns:wsnt=\"" + wsnt + "\" ConsumerSequenceType=\"BrokerConsumers\">" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" +
+ "</wsa:EndpointReference>" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" +
+ "</wsa:EndpointReference>" +
+ "</wsnt:NeighborBrokerConsumers>" +
+ "</wsnt:RemoveBroker>";
+ private static String testReplaceBrokerConnection =
+ "<wsnt:ReplaceBrokerConnection xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsnt:RemovedBroker xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl + "</wsa:Address>" +
+ "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" +
+ "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid1 + "</wsnt:BrokerID>" +
+ "</wsa:ReferenceProperties>" +
+ "</wsa:EndpointReference>" +
+ "</wsnt:RemovedBroker>" +
+ "<wsnt:Neighbors xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsnt:Broker xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsnt:BrokerConsumerReference xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl1 + "</wsa:Address>" +
+ "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" +
+ "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid1 + "</wsnt:BrokerID>" +
+ "</wsa:ReferenceProperties>" +
+ "</wsa:EndpointReference>" +
+ "</wsnt:BrokerConsumerReference>" +
+ "<wsnt:BrokerProducerReference xmlns:wsnt=\"" + wsnt + "\">" +
+ "<wsa:EndpointReference xmlns:wsa=\"" + wsa + "\">" +
+ "<wsa:Address xmlns:wsa=\"" + wsa + "\">" + testUrl2 + "</wsa:Address>" +
+ "<wsa:ReferenceProperties xmlns:wsa=\"" + wsa + "\">" +
+ "<wsnt:BrokerID xmlns:wsnt=\"" + wsnt + "\">" + bid2 + "</wsnt:BrokerID>" +
+ "</wsa:ReferenceProperties>" +
+ "</wsa:EndpointReference>" +
+ "</wsnt:BrokerProducerReference>" +
+ "</wsnt:Broker>" +
+ "</wsnt:Neighbors>" +
+ "</wsnt:ReplaceBrokerConnection>";
+
+ public void testSubscribe() throws Exception {
+ DefaultEncodingRegistry der = new DefaultEncodingRegistry();
+ SubscribeEnDeCoder sed = new SubscribeEnDeCoder(der);
+ sed.start();
+ ConsumerReferenceEnDeCoder cred = new ConsumerReferenceEnDeCoder(der);
+ cred.start();
+ EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der);
+ epred.start();
+ EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der);
+ eaed.start();
+
+ XMLInputFactory xif = XMLInputFactory.newInstance();
+ XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testSubscribe));
+ reader.next();
+ Subscribe subscribe = (Subscribe)der.decode(reader);
+ Assert.assertEquals(subscribe.getConsumerReference().getReference().getEndpointAddress().getAddress().toString(), testUrl);
+
+ XMLOutputFactory xof = XMLOutputFactory.newInstance();
+ StringWriter testWriter = new StringWriter();
+ XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter);
+ der.encode(subscribe, writer);
+ writer.flush();
+ String encoded = testWriter.toString();
+ Assert.assertEquals(encoded, testSubscribe);
+ }
+
+ public void testNewConsumerResponse() throws Exception {
+ DefaultEncodingRegistry der = new DefaultEncodingRegistry();
+ NewConsumerResponseEnDeCoder ncred = new NewConsumerResponseEnDeCoder(der);
+ ncred.start();
+ EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der);
+ epred.start();
+ EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der);
+ eaed.start();
+
+ XMLInputFactory xif = XMLInputFactory.newInstance();
+ XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testNewConsumerResponse));
+ reader.next();
+ NewConsumerResponse newConsumerResponse = (NewConsumerResponse)der.decode(reader);
+ Assert.assertEquals(newConsumerResponse.getSequenceType(), "EndProducers");
+ Assert.assertEquals(newConsumerResponse.getReferenceSequence().iterator().next().getEndpointAddress().getAddress().toString(),
+ testUrl);
+
+ XMLOutputFactory xof = XMLOutputFactory.newInstance();
+ StringWriter testWriter = new StringWriter();
+ XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter);
+ der.encode(newConsumerResponse, writer);
+ writer.flush();
+ String encoded = testWriter.toString();
+ Assert.assertEquals(encoded, testNewConsumerResponse);
+ }
+
+ public void testNoProducersResponse() throws Exception {
+ DefaultEncodingRegistry der = new DefaultEncodingRegistry();
+ NewConsumerResponseEnDeCoder ncred = new NewConsumerResponseEnDeCoder(der);
+ ncred.start();
+ EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der);
+ epred.start();
+ EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der);
+ eaed.start();
+
+ XMLInputFactory xif = XMLInputFactory.newInstance();
+ XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testNoProducersResponse));
+ reader.next();
+ NewConsumerResponse newConsumerResponse = (NewConsumerResponse)der.decode(reader);
+ Assert.assertEquals(newConsumerResponse.getSequenceType(), "NoProducers");
+
+ XMLOutputFactory xof = XMLOutputFactory.newInstance();
+ StringWriter testWriter = new StringWriter();
+ XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter);
+ der.encode(newConsumerResponse, writer);
+ writer.flush();
+ String encoded = testWriter.toString();
+ Assert.assertEquals(encoded, testNoProducersResponse);
+ }
+
+ public void testNewProducerResponse() throws Exception {
+ DefaultEncodingRegistry der = new DefaultEncodingRegistry();
+ NewProducerResponseEnDeCoder npred = new NewProducerResponseEnDeCoder(der);
+ npred.start();
+ EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der);
+ epred.start();
+ EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der);
+ eaed.start();
+
+ XMLInputFactory xif = XMLInputFactory.newInstance();
+ XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testNewProducerResponse));
+ reader.next();
+ NewProducerResponse newProducerResponse = (NewProducerResponse)der.decode(reader);
+ Assert.assertEquals(newProducerResponse.getSequenceType(), "EndConsumers");
+ Iterator<EndpointReference> it = newProducerResponse.getReferenceSequence().iterator();
+ it.next();
+ Assert.assertEquals(it.next().getEndpointAddress().getAddress().toString(), testUrl2);
+
+ XMLOutputFactory xof = XMLOutputFactory.newInstance();
+ StringWriter testWriter = new StringWriter();
+ XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter);
+ der.encode(newProducerResponse, writer);
+ writer.flush();
+ String encoded = testWriter.toString();
+ Assert.assertEquals(encoded, testNewProducerResponse);
+ }
+
+ public void testNewBroker() throws Exception {
+ DefaultEncodingRegistry der = new DefaultEncodingRegistry();
+ NewBrokerEnDeCoder nbed = new NewBrokerEnDeCoder(der);
+ nbed.start();
+ BrokerConsumerReferenceEnDeCoder bcred = new BrokerConsumerReferenceEnDeCoder(der);
+ bcred.start();
+ EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der);
+ epred.start();
+ EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der);
+ eaed.start();
+ ReferencePropertiesEnDeCoder rped = new ReferencePropertiesEnDeCoder(der);
+ rped.start();
+ BrokerIDEnDeCoder bied = new BrokerIDEnDeCoder(der);
+ bied.start();
+ BrokerProducerReferenceEnDeCoder bpred = new BrokerProducerReferenceEnDeCoder(der);
+ bpred.start();
+
+ XMLInputFactory xif = XMLInputFactory.newInstance();
+ XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testNewBroker));
+ reader.next();
+ NewBroker newBroker = (NewBroker)der.decode(reader);
+ Assert.assertEquals(newBroker.getBrokerConsumerReference().getReference().getEndpointAddress().getAddress().toString(),
+ testUrl1);
+ Assert.assertEquals(newBroker.getBrokerProducerReference().getReference().getEndpointAddress().getAddress().toString(),
+ testUrl2);
+
+ XMLOutputFactory xof = XMLOutputFactory.newInstance();
+ StringWriter testWriter = new StringWriter();
+ XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter);
+ der.encode(newBroker, writer);
+ writer.flush();
+ String encoded = testWriter.toString();
+ Assert.assertEquals(encoded, testNewBroker);
+ }
+
+ public void testNewBrokerRespnse1() throws Exception {
+ DefaultEncodingRegistry der = new DefaultEncodingRegistry();
+ NewBrokerResponseEnDeCoder nbred = new NewBrokerResponseEnDeCoder(der);
+ nbred.start();
+ EndProducersEnDeCoder epred = new EndProducersEnDeCoder(der);
+ epred.start();
+ EndConsumersEnDeCoder ecred = new EndConsumersEnDeCoder(der);
+ ecred.start();
+ EndpointReferenceEnDeCoder ered = new EndpointReferenceEnDeCoder(der);
+ ered.start();
+ EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der);
+ eaed.start();
+
+ XMLInputFactory xif = XMLInputFactory.newInstance();
+ XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testNewBrokerResponse1));
+ reader.next();
+ NewBrokerResponse newBrokerResponse = (NewBrokerResponse)der.decode(reader);
+ Assert.assertFalse(!newBrokerResponse.isFirstBroker());
+ Assert.assertEquals(newBrokerResponse.getEndProducers().getSequenceType(), "NoProducers");
+ Assert.assertEquals(newBrokerResponse.getEndConsumers().getSequenceType(), "EndConsumers");
+ Assert.assertEquals(newBrokerResponse.getEndConsumers().getReferenceSequence().get(0).getEndpointAddress().getAddress().toString(),
+ testUrl1);
+
+ XMLOutputFactory xof = XMLOutputFactory.newInstance();
+ StringWriter testWriter = new StringWriter();
+ XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter);
+ der.encode(newBrokerResponse, writer);
+ writer.flush();
+ String encoded = testWriter.toString();
+ Assert.assertEquals(encoded, testNewBrokerResponse1);
+ }
+
+ public void testNewBrokerRespnse2() throws Exception {
+ DefaultEncodingRegistry der = new DefaultEncodingRegistry();
+ NewBrokerResponseEnDeCoder nbred = new NewBrokerResponseEnDeCoder(der);
+ nbred.start();
+ BrokersEnDeCoder bsed = new BrokersEnDeCoder(der);
+ bsed.start();
+ BrokerEnDeCoder bed = new BrokerEnDeCoder(der);
+ bed.start();
+ BrokerConsumerReferenceEnDeCoder bcred = new BrokerConsumerReferenceEnDeCoder(der);
+ bcred.start();
+ BrokerProducerReferenceEnDeCoder bpred = new BrokerProducerReferenceEnDeCoder(der);
+ bpred.start();
+ EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der);
+ epred.start();
+ EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der);
+ eaed.start();
+ ReferencePropertiesEnDeCoder rped = new ReferencePropertiesEnDeCoder(der);
+ rped.start();
+ BrokerIDEnDeCoder bied = new BrokerIDEnDeCoder(der);
+ bied.start();
+
+ XMLInputFactory xif = XMLInputFactory.newInstance();
+ XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testNewBrokerResponse2));
+ reader.next();
+ NewBrokerResponse newBrokerResponse = (NewBrokerResponse)der.decode(reader);
+ Assert.assertFalse(newBrokerResponse.isFirstBroker());
+ Assert.assertEquals(newBrokerResponse.getBrokers().getBrokerSequence().get(0)
+ .getBrokerConsumerReference().getReference().getEndpointAddress().getAddress().toString(),
+ testUrl1);
+ Assert.assertEquals(newBrokerResponse.getBrokers().getBrokerSequence().get(0)
+ .getBrokerProducerReference().getReference().getEndpointAddress().getAddress().toString(),
+ testUrl2);
+
+ XMLOutputFactory xof = XMLOutputFactory.newInstance();
+ StringWriter testWriter = new StringWriter();
+ XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter);
+ der.encode(newBrokerResponse, writer);
+ writer.flush();
+ String encoded = testWriter.toString();
+ Assert.assertEquals(encoded, testNewBrokerResponse2);
+ }
+
+ public void testRemoveBroker() throws Exception {
+ DefaultEncodingRegistry der = new DefaultEncodingRegistry();
+ RemoveBrokerEnDeCoder rbed = new RemoveBrokerEnDeCoder(der);
+ rbed.start();
+ BrokerConsumerReferenceEnDeCoder bcred = new BrokerConsumerReferenceEnDeCoder(der);
+ bcred.start();
+ EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der);
+ epred.start();
+ EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der);
+ eaed.start();
+ ReferencePropertiesEnDeCoder rped = new ReferencePropertiesEnDeCoder(der);
+ rped.start();
+ BrokerIDEnDeCoder bied = new BrokerIDEnDeCoder(der);
+ bied.start();
+ NeighborBrokerConsumersEnDeCoder nbced = new NeighborBrokerConsumersEnDeCoder(der);
+ nbced.start();
+
+ XMLInputFactory xif = XMLInputFactory.newInstance();
+ XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testRemoveBroker));
+ reader.next();
+ RemoveBroker removeBroker = (RemoveBroker)der.decode(reader);
+ Assert.assertEquals(removeBroker.getBrokerConsumerReference().getReference().getEndpointAddress().getAddress().toString(),
+ testUrl);
+ NeighborBrokerConsumers neighborBrokerConsumers = removeBroker.getNeighborBrokerConsumers();
+ Assert.assertEquals(neighborBrokerConsumers.getSequenceType(), "BrokerConsumers");
+ Iterator<EndpointReference> it = neighborBrokerConsumers.getReferenceSequence().iterator();
+ it.next();
+ Assert.assertEquals(it.next().getEndpointAddress().getAddress().toString(), testUrl2);
+
+ XMLOutputFactory xof = XMLOutputFactory.newInstance();
+ StringWriter testWriter = new StringWriter();
+ XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter);
+ der.encode(removeBroker, writer);
+ writer.flush();
+ String encoded = testWriter.toString();
+ Assert.assertEquals(encoded, testRemoveBroker);
+ }
+
+ public void testReplaceBrokerConnection() throws Exception {
+ DefaultEncodingRegistry der = new DefaultEncodingRegistry();
+ ReplaceBrokerConnectionEnDeCoder rbced = new ReplaceBrokerConnectionEnDeCoder(der);
+ rbced.start();
+ RemovedBrokerEnDeCoder rbed = new RemovedBrokerEnDeCoder(der);
+ rbed.start();
+ EndpointReferenceEnDeCoder epred = new EndpointReferenceEnDeCoder(der);
+ epred.start();
+ EndpointAddressEnDeCoder eaed = new EndpointAddressEnDeCoder(der);
+ eaed.start();
+ ReferencePropertiesEnDeCoder rped = new ReferencePropertiesEnDeCoder(der);
+ rped.start();
+ BrokerIDEnDeCoder bied = new BrokerIDEnDeCoder(der);
+ bied.start();
+ BrokerEnDeCoder bed = new BrokerEnDeCoder(der);
+ bed.start();
+ BrokerConsumerReferenceEnDeCoder bcred = new BrokerConsumerReferenceEnDeCoder(der);
+ bcred.start();
+ BrokerProducerReferenceEnDeCoder bpred = new BrokerProducerReferenceEnDeCoder(der);
+ bpred.start();
+ NeighborsEnDeCoder nced = new NeighborsEnDeCoder(der);
+ nced.start();
+
+ XMLInputFactory xif = XMLInputFactory.newInstance();
+ XMLStreamReader reader = xif.createXMLStreamReader(new StringReader(testReplaceBrokerConnection));
+ reader.next();
+ ReplaceBrokerConnection replaceBrokerConnection = (ReplaceBrokerConnection)der.decode(reader);
+ Assert.assertEquals(replaceBrokerConnection.getRemovedBroker().getReference().getEndpointAddress().getAddress().toString(),
+ testUrl);
+ Neighbors neighbors = replaceBrokerConnection.getNeighbors();
+ Assert.assertEquals(neighbors.getBrokerSequence().get(0)
+ .getBrokerConsumerReference().getReference().getEndpointAddress().getAddress().toString(),
+ testUrl1);
+ Assert.assertEquals(neighbors.getBrokerSequence().get(0)
+ .getBrokerProducerReference().getReference().getEndpointAddress().getAddress().toString(),
+ testUrl2);
+
+ XMLOutputFactory xof = XMLOutputFactory.newInstance();
+ StringWriter testWriter = new StringWriter();
+ XMLStreamWriter writer = xof.createXMLStreamWriter(testWriter);
+ der.encode(replaceBrokerConnection, writer);
+ writer.flush();
+ String encoded = testWriter.toString();
+ Assert.assertEquals(encoded, testReplaceBrokerConnection);
+ }
+}