summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorfmoga <fmoga@13f79535-47bb-0310-9956-ffa450edef68>2011-07-09 05:17:19 +0000
committerfmoga <fmoga@13f79535-47bb-0310-9956-ffa450edef68>2011-07-09 05:17:19 +0000
commit4be113c01318385ca91a48ee0b3cfd1917f39966 (patch)
tree57ffa617b832e2b591f6fc46624f2e3c39de577c
parent9b367c6e64775c18f2a4ca9ead2a2d389e1ca0c6 (diff)
Add websocket message multiplexing via persistent connections.
git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1144596 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java43
-rw-r--r--sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java57
-rw-r--r--sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java47
-rw-r--r--sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java95
-rw-r--r--sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java76
-rw-r--r--sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java1
-rw-r--r--sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java66
-rw-r--r--sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java87
-rw-r--r--sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java58
9 files changed, 370 insertions, 160 deletions
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java
index e21a2402b1..492dfae3dd 100644
--- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java
+++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.tuscany.sca.binding.websocket.runtime;
import java.util.ArrayList;
@@ -111,17 +129,20 @@ public class JSONUtil {
return builder.toString();
}
- /**
- * Decode JSON to a given Java type.
- *
- * @param responseJSON
- * the json to convert
- * @param returnType
- * the return type to convert to
- * @return the converted object
- */
- public static Object decodeResponse(String responseJSON, Class<?> returnType) {
- return gson.fromJson(responseJSON, returnType);
+ public static String encodeRequest(WebSocketBindingRequest request) {
+ return gson.toJson(request);
+ }
+
+ public static WebSocketBindingRequest decodeRequest(String jsonRequest) {
+ return gson.fromJson(jsonRequest, WebSocketBindingRequest.class);
+ }
+
+ public static WebSocketBindingResponse decodeResponse(String operationResponse) {
+ return gson.fromJson(operationResponse, WebSocketBindingResponse.class);
+ }
+
+ public static Object decodeResponsePayload(String payload, Class<?> returnType) {
+ return gson.fromJson(payload, returnType);
}
}
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java
new file mode 100644
index 0000000000..19b875805a
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingRequest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.websocket.runtime;
+
+public class WebSocketBindingRequest {
+
+ private String requestId;
+ private String uri;
+ private String payload;
+
+ public WebSocketBindingRequest(String requestId, String uri, String payload) {
+ this.requestId = requestId;
+ this.uri = uri;
+ this.payload = payload;
+ }
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+
+ public void setPayload(String payload) {
+ this.payload = payload;
+ }
+
+}
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java
new file mode 100644
index 0000000000..51664879a3
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketBindingResponse.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.websocket.runtime;
+
+public class WebSocketBindingResponse {
+
+ private String uri;
+ private String payload;
+
+ public WebSocketBindingResponse(String uri, String payload) {
+ this.uri = uri;
+ this.payload = payload;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+
+ public void setPayload(String payload) {
+ this.payload = payload;
+ }
+
+}
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java
new file mode 100644
index 0000000000..c33b3ca12a
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketOperationDispatcher.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.websocket.runtime;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.apache.websocket.ServerWebSocket;
+import org.apache.websocket.WebSocket;
+import org.apache.websocket.WebSocketApplication;
+import org.apache.websocket.WebSocketException;
+
+public class WebSocketOperationDispatcher implements WebSocketApplication<SocketChannel> {
+
+ private ServerWebSocket server;
+ private Map<String, RuntimeEndpoint> endpoints = new HashMap<String, RuntimeEndpoint>();
+ private Map<String, Operation> operations = new HashMap<String, Operation>();
+
+ public WebSocketOperationDispatcher(ServerWebSocket server) {
+ this.server = server;
+ }
+
+ public void addOperation(String uri, RuntimeEndpoint endpoint, Operation operation) {
+ endpoints.put(uri, endpoint);
+ operations.put(uri, operation);
+ }
+
+ public Operation getOperation(String uri) {
+ return operations.get(uri);
+ }
+
+ public RuntimeEndpoint getEndpoint(String uri) {
+ return endpoints.get(uri);
+ }
+
+ @Override
+ public void onConnection(WebSocket<SocketChannel> socket) {
+ // release server thread
+ new Thread(new WebSocketRequestHandler(socket, this)).start();
+ }
+
+ @Override
+ public void onHandshakeError(IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ @Override
+ public String acceptProtocol(String protocol) {
+ // don't accept any subprotocols
+ return null;
+ }
+
+ @Override
+ public boolean acceptOrigin(String origin) {
+ // accept all clients
+ return true;
+ }
+
+ @Override
+ public Map<String, String> acceptExtensions(Map<String, String> headers) throws WebSocketException {
+ // don't accept any extensions
+ return null;
+ }
+
+ public void shutdown() {
+ try {
+ server.close();
+ endpoints.clear();
+ operations.clear();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java
new file mode 100644
index 0000000000..59d8af3133
--- /dev/null
+++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebSocketRequestHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tuscany.sca.binding.websocket.runtime;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.channels.SocketChannel;
+
+import org.apache.tuscany.sca.interfacedef.Operation;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.apache.websocket.WebSocket;
+
+public class WebSocketRequestHandler implements Runnable {
+
+ private WebSocket<SocketChannel> websocket;
+ private WebSocketOperationDispatcher dispatcher;
+
+ public WebSocketRequestHandler(WebSocket<SocketChannel> socket, WebSocketOperationDispatcher dispatcher) {
+ this.websocket = socket;
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ // TODO use Java NIO selectors on websockets
+ String request = websocket.receiveText();
+ String response = handleRequest(request);
+ websocket.sendText(response);
+ } catch (IOException e) {
+ if (!websocket.isOpen()) {
+ System.out.println("Client disconnected. Stopping WebSocketRequestHandler.");
+ break;
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ // TODO handle request asynchronously in a background thread
+ private String handleRequest(String jsonRequest) {
+ WebSocketBindingRequest request = JSONUtil.decodeRequest(jsonRequest);
+ RuntimeEndpoint wire = dispatcher.getEndpoint(request.getUri());
+ Operation operation = dispatcher.getOperation(request.getUri());
+ System.out.println("handleRequest - " + request.getUri() + " - " + wire + " - " + operation);
+ String jsonParams = request.getPayload();
+ Object[] args = JSONUtil.decodeJsonParamsForOperation(jsonParams, operation);
+ try {
+ Object operationResponse = wire.invoke(operation, args);
+ String payload = JSONUtil.encodeResponse(operationResponse);
+ WebSocketBindingResponse response = new WebSocketBindingResponse(request.getRequestId(), payload);
+ return JSONUtil.encodeResponse(response);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java
index 4057e46956..d2e95f9623 100644
--- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java
+++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java
@@ -41,6 +41,7 @@ public class WebsocketReferenceBindingProvider implements ReferenceBindingProvid
}
public void stop() {
+ WebsocketReferenceInvoker.shutdown();
}
public InterfaceContract getBindingInterfaceContract() {
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java
index a9c2505ea8..84025491fe 100644
--- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java
+++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java
@@ -23,6 +23,9 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SocketChannel;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.interfacedef.Operation;
@@ -33,6 +36,9 @@ import org.apache.websocket.WebSocketConnector;
public class WebsocketReferenceInvoker implements Invoker {
+ // TODO add timeout mechanism for persistent connections
+ private static ConcurrentMap<String, WebSocket<SocketChannel>> persistentWebsockets = new ConcurrentHashMap<String, WebSocket<SocketChannel>>();
+
protected Operation operation;
protected EndpointReference endpoint;
@@ -43,41 +49,57 @@ public class WebsocketReferenceInvoker implements Invoker {
public Message invoke(Message msg) {
try {
- return doInvoke(msg);
+ WebSocket<SocketChannel> websocket = initWebsocketConnection(endpoint.getBinding().getURI());
+ return doInvoke(msg, websocket);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- public Message doInvoke(Message msg) {
+ private WebSocket<SocketChannel> initWebsocketConnection(String uri) throws IOException, URISyntaxException {
+ WebSocket<SocketChannel> websocket = null;
+ synchronized (persistentWebsockets) {
+ websocket = persistentWebsockets.get(uri);
+ if (websocket == null) {
+ WebSocketConnector connector = new WebSocketConnector();
+ websocket = connector.connect(new URI(uri), null, "apache-tuscany", null);
+ persistentWebsockets.put(uri, websocket);
+ }
+ }
+ return websocket;
+ }
+
+ public Message doInvoke(Message msg, WebSocket<SocketChannel> websocket) throws IOException {
String componentName = endpoint.getTargetEndpoint().getComponent().getName();
String serviceName = endpoint.getTargetEndpoint().getService().getName();
String operationName = operation.getName();
- String uri = endpoint.getBinding().getURI() + "/" + componentName + "/" + serviceName + "/" + operationName;
- String jsonParams = JSONUtil.encodeRequestParams((Object[]) msg.getBody());
- String responseJSON = invokeWebSocketRequest(uri, jsonParams);
+ String uri = componentName + "/" + serviceName + "/" + operationName;
+ String payload = JSONUtil.encodeRequestParams((Object[]) msg.getBody());
+ WebSocketBindingRequest request = new WebSocketBindingRequest(UUID.randomUUID().toString(), uri, payload);
+
+ String operationResponse = invokeViaWebsocket(websocket, JSONUtil.encodeRequest(request));
+
+ WebSocketBindingResponse response = JSONUtil.decodeResponse(operationResponse);
Class<?> returnType = operation.getOutputType().getLogical().get(0).getPhysical();
- Object response = JSONUtil.decodeResponse(responseJSON, returnType);
- msg.setBody(response);
+ Object invocationResponse = JSONUtil.decodeResponsePayload(response.getPayload(), returnType);
+ msg.setBody(invocationResponse);
return msg;
}
- private String invokeWebSocketRequest(String uri, String jsonParams) {
- try {
- return doInvokeWebSocketRequest(uri, jsonParams);
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
+ private String invokeViaWebsocket(WebSocket<SocketChannel> websocket, String request) throws IOException {
+ websocket.sendText(request);
+ return websocket.receiveText();
}
- private String doInvokeWebSocketRequest(String uri, String jsonParams) throws IOException, URISyntaxException {
- WebSocketConnector connector = new WebSocketConnector();
- WebSocket<SocketChannel> websocket = connector.connect(new URI(uri), null, "apache-tuscany", null);
- websocket.sendText(jsonParams);
- String jsonResponse = websocket.receiveText();
- websocket.close();
- return jsonResponse;
+ public static void shutdown() {
+ for (WebSocket<SocketChannel> websocket : persistentWebsockets.values()) {
+ try {
+ websocket.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ persistentWebsockets.clear();
}
+
}
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java
index 461b364b2f..fe5efcffef 100644
--- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java
+++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java
@@ -22,24 +22,20 @@ package org.apache.tuscany.sca.binding.websocket.runtime;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
+import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.websocket.ServerWebSocket;
-import org.apache.websocket.WebSocket;
-import org.apache.websocket.WebSocketApplication;
-import org.apache.websocket.WebSocketException;
public class WebsocketServiceBindingProvider implements ServiceBindingProvider {
+ private static Map<String, WebSocketOperationDispatcher> dispatchers = new HashMap<String, WebSocketOperationDispatcher>();
+
private RuntimeEndpoint endpoint;
- private static ConcurrentMap<String, ServerWebSocket> websocketServers = new ConcurrentHashMap<String, ServerWebSocket>();
public WebsocketServiceBindingProvider(RuntimeEndpoint endpoint) {
this.endpoint = endpoint;
@@ -47,24 +43,26 @@ public class WebsocketServiceBindingProvider implements ServiceBindingProvider {
public void start() {
String uri = endpoint.getBinding().getURI();
- ServerWebSocket server = initWebSocketServerForURI(uri);
+ WebSocketOperationDispatcher dispatcher = initDispatcherForURI(uri);
String component = endpoint.getComponent().getName();
String service = endpoint.getService().getName();
for (Operation op : getBindingInterfaceContract().getInterface().getOperations()) {
String operation = op.getName();
- server.register("/" + component + "/" + service + "/" + operation, new WebSocketEndpoint(endpoint, op));
- System.out.println("Registered websocket endpoint for /" + component + "/" + service + "/" + operation);
+ dispatcher.addOperation(component + "/" + service + "/" + operation, endpoint, op);
}
}
- private ServerWebSocket initWebSocketServerForURI(String uri) {
- ServerWebSocket server = websocketServers.get(uri);
- if (server == null) {
+ private WebSocketOperationDispatcher initDispatcherForURI(String uri) {
+ WebSocketOperationDispatcher dispatcher = dispatchers.get(uri);
+ if (dispatcher == null) {
try {
- server = new ServerWebSocket(new URI(uri));
- websocketServers.put(uri, server);
- System.out.println("Starting websocket server at " + uri + "...");
+ ServerWebSocket server = new ServerWebSocket(new URI(uri));
+ System.out.println("Starting websocket server " + server + " at " + uri + "...");
+ dispatcher = new WebSocketOperationDispatcher(server);
+ System.out.println("Created new dispatcher for " + uri + " " + dispatcher);
+ dispatchers.put(uri, dispatcher);
+ server.register("/", dispatcher);
new Thread(server).start();
} catch (IOException e) {
throw new RuntimeException(e);
@@ -72,20 +70,14 @@ public class WebsocketServiceBindingProvider implements ServiceBindingProvider {
throw new RuntimeException(e);
}
}
- return server;
+ return dispatcher;
}
public void stop() {
- if (websocketServers != null) {
- for (ServerWebSocket server : websocketServers.values()) {
- try {
- server.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- websocketServers.clear();
+ for (WebSocketOperationDispatcher dispatcher : dispatchers.values()) {
+ dispatcher.shutdown();
}
+ dispatchers.clear();
}
public InterfaceContract getBindingInterfaceContract() {
@@ -96,47 +88,4 @@ public class WebsocketServiceBindingProvider implements ServiceBindingProvider {
return false;
}
- public class WebSocketEndpoint implements WebSocketApplication<SocketChannel> {
-
- private RuntimeEndpoint endpoint;
- private Operation operation;
-
- public WebSocketEndpoint(RuntimeEndpoint endpoint, Operation operation) {
- this.endpoint = endpoint;
- this.operation = operation;
- }
-
- @Override
- public void onConnection(WebSocket<SocketChannel> socket) {
- // handle request in a non-blocking fashion releasing the server
- // thread
- new Thread(new WebsocketServiceInvoker(endpoint, operation, socket)).start();
- }
-
- @Override
- public void onHandshakeError(IOException e) {
- System.out.println("Handshake error!\n");
- e.printStackTrace();
- }
-
- @Override
- public String acceptProtocol(String protocol) {
- // don't accept any subprotocols
- return null;
- }
-
- @Override
- public boolean acceptOrigin(String origin) {
- // accept all clients
- return true;
- }
-
- @Override
- public Map<String, String> acceptExtensions(Map<String, String> headers) throws WebSocketException {
- // don't accept any extensions
- return null;
- }
-
- }
-
}
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java
deleted file mode 100644
index e7022e273a..0000000000
--- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.binding.websocket.runtime;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.nio.channels.SocketChannel;
-
-import org.apache.tuscany.sca.interfacedef.Operation;
-import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
-import org.apache.websocket.WebSocket;
-
-public class WebsocketServiceInvoker implements Runnable {
-
- private RuntimeEndpoint wire;
- private Operation operation;
- private WebSocket<SocketChannel> websocket;
-
- public WebsocketServiceInvoker(RuntimeEndpoint wire, Operation operation, WebSocket<SocketChannel> websocket) {
- this.wire = wire;
- this.operation = operation;
- this.websocket = websocket;
- }
-
- @Override
- public void run() {
- try {
- String jsonParams = websocket.receiveText();
- Object[] args = JSONUtil.decodeJsonParamsForOperation(jsonParams, operation);
- Object response = wire.invoke(operation, args);
- String jsonResponse = JSONUtil.encodeResponse(response);
- websocket.sendText(jsonResponse);
- websocket.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (InvocationTargetException e) {
- throw new RuntimeException(e);
- }
- }
-
-}