diff options
author | fmoga <fmoga@13f79535-47bb-0310-9956-ffa450edef68> | 2011-07-25 08:34:41 +0000 |
---|---|---|
committer | fmoga <fmoga@13f79535-47bb-0310-9956-ffa450edef68> | 2011-07-25 08:34:41 +0000 |
commit | ae871022ad3266580e5fb169288ddbb85dc8d959 (patch) | |
tree | fb7da7a48ab830bb77b36d002b36d44dd152ac17 | |
parent | 225512f078d5d55456d22d277d1077d81e74aff4 (diff) |
Add multiple response support to binding.websocket.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1150577 13f79535-47bb-0310-9956-ffa450edef68
12 files changed, 208 insertions, 75 deletions
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java index 3fdb1ee1c1..30ad5a3ffe 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java @@ -33,12 +33,12 @@ public class JSONUtil { private static Gson gson = new Gson(); - public static String encodeMessage(WebSocketBindingMessage request) { + public static String encodeMessage(WebsocketBindingMessage request) { return gson.toJson(request); } - public static WebSocketBindingMessage decodeMessage(String jsonRequest) { - return gson.fromJson(jsonRequest, WebSocketBindingMessage.class); + public static WebsocketBindingMessage decodeMessage(String jsonRequest) { + return gson.fromJson(jsonRequest, WebsocketBindingMessage.class); } public static String encodePayload(Object payload) { diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/TuscanyWebSocket.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/TuscanyWebsocket.java index c267d9e6a4..b32949d2f4 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/TuscanyWebSocket.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/TuscanyWebsocket.java @@ -20,42 +20,60 @@ package org.apache.tuscany.sca.binding.websocket.runtime; import java.io.IOException; +import java.util.UUID; import org.eclipse.jetty.websocket.WebSocket; -public class TuscanyWebSocket implements WebSocket, WebSocket.OnTextMessage { +public class TuscanyWebsocket implements WebSocket, WebSocket.OnTextMessage { + private String id; private Connection connection; - private WebSocketBindingDispatcher dispatcher; + private WebsocketBindingDispatcher dispatcher; - public TuscanyWebSocket(WebSocketBindingDispatcher dispatcher) { + public TuscanyWebsocket(WebsocketBindingDispatcher dispatcher) { this.dispatcher = dispatcher; } @Override public void onOpen(Connection connection) { this.connection = connection; + this.id = UUID.randomUUID().toString(); + WebsocketConnectionManager.addConnection(this); } @Override public void onMessage(String jsonRequest) { - WebSocketBindingMessage request = JSONUtil.decodeMessage(jsonRequest); + WebsocketBindingMessage request = JSONUtil.decodeMessage(jsonRequest); WebsocketServiceInvoker invoker = dispatcher.dispatch(request.getOperation()); if (invoker == null) { throw new RuntimeException("No operation found for " + request.getOperation()); } else { - WebSocketBindingMessage response = invoker.invokeSync(request); - String jsonResponse = JSONUtil.encodeMessage(response); - try { - connection.sendMessage(jsonResponse); - } catch (IOException e) { - throw new RuntimeException(e); + if (!invoker.isNonBlocking()) { + WebsocketBindingMessage response = invoker.invokeSync(request); + send(response); + } else { + invoker.invokeAsync(request, this); } } } @Override public void onClose(int closeCode, String message) { + WebsocketConnectionManager.removeConnection(this); + } + + public void send(WebsocketBindingMessage message) { + try { + if (connection.isOpen()) { + connection.sendMessage(JSONUtil.encodeMessage(message)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public String getId() { + return id; } } diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketBindingCallback.java index 65c9139a28..877eda4027 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketBindingCallback.java @@ -1,41 +1,28 @@ -/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.binding.websocket.runtime;
-
-import org.apache.tuscany.sca.assembly.EndpointReference;
-import org.apache.tuscany.sca.interfacedef.Operation;
-import org.apache.tuscany.sca.invocation.Invoker;
-import org.apache.tuscany.sca.invocation.Message;
-
-public class WebsocketReferenceInvoker implements Invoker {
-
- protected Operation operation;
- protected EndpointReference endpoint;
-
- public WebsocketReferenceInvoker(Operation operation, EndpointReference endpoint) {
- this.operation = operation;
- this.endpoint = endpoint;
- }
-
- public Message invoke(Message msg) {
- throw new RuntimeException("Not implemented yet");
- }
-
-}
+/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.binding.websocket.runtime; + +import org.oasisopen.sca.annotation.Remotable; + +@Remotable +public interface WebsocketBindingCallback { + + public WebsocketStatus sendMessage(Object message); + +} diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingDispatcher.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketBindingDispatcher.java index 4e0dbaffd5..ec7b249999 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingDispatcher.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketBindingDispatcher.java @@ -24,7 +24,7 @@ import java.util.Map; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.runtime.RuntimeEndpoint; -public class WebSocketBindingDispatcher { +public class WebsocketBindingDispatcher { private Map<String, WebsocketServiceInvoker> invokers = new HashMap<String, WebsocketServiceInvoker>(); diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingMessage.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketBindingMessage.java index 1ff8c4c10e..1e7abb2aba 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingMessage.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketBindingMessage.java @@ -18,12 +18,12 @@ */ package org.apache.tuscany.sca.binding.websocket.runtime; -public class WebSocketBindingMessage { +public class WebsocketBindingMessage { private String operation; private String payload; - public WebSocketBindingMessage(String operation, String payload) { + public WebsocketBindingMessage(String operation, String payload) { this.operation = operation; this.payload = payload; } diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketCallbackInvoker.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketCallbackInvoker.java new file mode 100644 index 0000000000..5edd4c9a2d --- /dev/null +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketCallbackInvoker.java @@ -0,0 +1,55 @@ +/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.binding.websocket.runtime;
+
+import org.apache.tuscany.sca.assembly.EndpointReference;
+import org.apache.tuscany.sca.core.invocation.Constants;
+import org.apache.tuscany.sca.core.invocation.impl.MessageImpl;
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.Message;
+
+public class WebsocketCallbackInvoker implements Invoker {
+
+ protected Operation operation;
+ protected EndpointReference endpoint;
+
+ public WebsocketCallbackInvoker(Operation operation, EndpointReference endpoint) {
+ this.operation = operation;
+ this.endpoint = endpoint;
+ }
+
+ public Message invoke(Message msg) {
+ String channelId = (String) msg.getHeaders().get(Constants.RELATES_TO);
+ TuscanyWebsocket websocket = WebsocketConnectionManager.getConnection(channelId);
+ Message response = new MessageImpl();
+ if (websocket == null) {
+ response.setBody(WebsocketStatus.CLOSED);
+ } else {
+ Object[] body = msg.getBody();
+ String payload = JSONUtil.encodePayload(body[0]);
+ String operation = msg.getTo().getURI();
+ WebsocketBindingMessage message = new WebsocketBindingMessage(operation, payload);
+ websocket.send(message);
+ response.setBody(WebsocketStatus.OPEN);
+ }
+ return response;
+ }
+}
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketConnectionManager.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketConnectionManager.java new file mode 100644 index 0000000000..b22fcbe9e3 --- /dev/null +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketConnectionManager.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.binding.websocket.runtime; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class WebsocketConnectionManager { + + private static ConcurrentMap<String, TuscanyWebsocket> activeConnections = new ConcurrentHashMap<String, TuscanyWebsocket>(); + + public static void addConnection(TuscanyWebsocket websocket) { + activeConnections.put(websocket.getId(), websocket); + } + + public static void removeConnection(TuscanyWebsocket websocket) { + activeConnections.remove(websocket.getId()); + } + + public static TuscanyWebsocket getConnection(String id) { + return activeConnections.get(id); + } + + public static void clear() { + activeConnections.clear(); + } + + private WebsocketConnectionManager() { + } + +} diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java index 4057e46956..c5df6363b9 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java @@ -34,7 +34,7 @@ public class WebsocketReferenceBindingProvider implements ReferenceBindingProvid }
public Invoker createInvoker(Operation operation) {
- return new WebsocketReferenceInvoker(operation, endpoint);
+ return new WebsocketCallbackInvoker(operation, endpoint);
}
public void start() {
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketServer.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServer.java index e470ddfde0..b1b8251edb 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketServer.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServer.java @@ -28,11 +28,11 @@ import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.websocket.WebSocket; import org.eclipse.jetty.websocket.WebSocketHandler; -public class WebSocketServer extends Server { +public class WebsocketServer extends Server { - private WebSocketBindingDispatcher dispatcher; + private WebsocketBindingDispatcher dispatcher; - public WebSocketServer(int port) throws URISyntaxException { + public WebsocketServer(int port) throws URISyntaxException { SelectChannelConnector connector = new SelectChannelConnector(); connector.setPort(port); addConnector(connector); @@ -41,15 +41,14 @@ public class WebSocketServer extends Server { @Override public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) { - System.out.println("Connection established"); - return new TuscanyWebSocket(dispatcher); + return new TuscanyWebsocket(dispatcher); } }); - dispatcher = new WebSocketBindingDispatcher(); + dispatcher = new WebsocketBindingDispatcher(); } - public WebSocketBindingDispatcher getDispatcher() { + public WebsocketBindingDispatcher getDispatcher() { return dispatcher; } diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java index e5c729dd8c..c98399c8e8 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java @@ -33,7 +33,7 @@ public class WebsocketServiceBindingProvider implements ServiceBindingProvider { private static final int DEFAULT_PORT = 9000;
private static final String JAVASCRIPT_RESOURCE_PATH = "/org.apache.tuscany.sca.WebsocketComponentContext.js";
- private static Map<Integer, WebSocketServer> servers = new HashMap<Integer, WebSocketServer>();
+ private static Map<Integer, WebsocketServer> servers = new HashMap<Integer, WebsocketServer>();
private RuntimeEndpoint endpoint;
private ServletHost servletHost;
@@ -49,7 +49,7 @@ public class WebsocketServiceBindingProvider implements ServiceBindingProvider { port = Integer.parseInt(binding.getPort());
}
try {
- WebSocketServer server = initServerForURI(port);
+ WebsocketServer server = initServerForURI(port);
String component = endpoint.getComponent().getName();
String service = endpoint.getService().getName();
for (Operation op : getBindingInterfaceContract().getInterface().getOperations()) {
@@ -64,10 +64,10 @@ public class WebsocketServiceBindingProvider implements ServiceBindingProvider { initJavascriptResource();
}
- private WebSocketServer initServerForURI(int port) throws Exception {
- WebSocketServer server = servers.get(port);
+ private WebsocketServer initServerForURI(int port) throws Exception {
+ WebsocketServer server = servers.get(port);
if (server == null) {
- server = new WebSocketServer(port);
+ server = new WebsocketServer(port);
server.start();
servers.put(port, server);
}
@@ -82,7 +82,7 @@ public class WebsocketServiceBindingProvider implements ServiceBindingProvider { }
public void stop() {
- for (WebSocketServer server : servers.values()) {
+ for (WebsocketServer server : servers.values()) {
try {
server.stop();
} catch (Exception e) {
@@ -92,6 +92,7 @@ public class WebsocketServiceBindingProvider implements ServiceBindingProvider { servers.clear();
servletHost.removeServletMapping(JAVASCRIPT_RESOURCE_PATH);
JavascriptGenerator.clear();
+ WebsocketConnectionManager.clear();
}
public InterfaceContract getBindingInterfaceContract() {
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java index 7615be916c..ca859e38f4 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java @@ -40,7 +40,13 @@ package org.apache.tuscany.sca.binding.websocket.runtime; import java.lang.reflect.InvocationTargetException;
+import org.apache.tuscany.sca.assembly.EndpointReference;
+import org.apache.tuscany.sca.core.assembly.impl.RuntimeEndpointImpl;
+import org.apache.tuscany.sca.core.assembly.impl.RuntimeEndpointReferenceImpl;
+import org.apache.tuscany.sca.core.invocation.Constants;
+import org.apache.tuscany.sca.core.invocation.impl.MessageImpl;
import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
public class WebsocketServiceInvoker {
@@ -53,21 +59,35 @@ public class WebsocketServiceInvoker { this.endpoint = endpoint;
}
- public WebSocketBindingMessage invokeSync(WebSocketBindingMessage request) {
+ public WebsocketBindingMessage invokeSync(WebsocketBindingMessage request) {
String jsonParams = request.getPayload();
Object[] args = JSONUtil.decodePayloadForOperation(jsonParams, operation);
try {
Object operationResponse = endpoint.invoke(operation, args);
String payload = JSONUtil.encodePayload(operationResponse);
- WebSocketBindingMessage response = new WebSocketBindingMessage(request.getOperation(), payload);
+ WebsocketBindingMessage response = new WebsocketBindingMessage(request.getOperation(), payload);
return response;
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
}
}
- public void invokeAsync(WebSocketBindingMessage request, TuscanyWebSocket channel) {
- // TODO add multiple response support
+ public void invokeAsync(WebsocketBindingMessage request, TuscanyWebsocket channel) {
+ String jsonParams = request.getPayload();
+ Object[] args = JSONUtil.decodePayloadForOperation(jsonParams, operation);
+ Message msg = new MessageImpl();
+ msg.getHeaders().put(Constants.MESSAGE_ID, channel.getId());
+ msg.setBody(args);
+ EndpointReference re = new RuntimeEndpointReferenceImpl();
+ RuntimeEndpointImpl callbackEndpoint = new RuntimeEndpointImpl();
+ callbackEndpoint.setURI(request.getOperation());
+ re.setCallbackEndpoint(callbackEndpoint);
+ msg.setFrom(re);
+ endpoint.invoke(operation, msg);
+ }
+
+ public boolean isNonBlocking() {
+ return operation.isNonBlocking();
}
}
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketStatus.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketStatus.java new file mode 100644 index 0000000000..c96685cedb --- /dev/null +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketStatus.java @@ -0,0 +1,5 @@ +package org.apache.tuscany.sca.binding.websocket.runtime; + +public enum WebsocketStatus { + OPEN, CLOSED +} |