From d4fd639b6abd000a0de4f201df7d748a1bb86e99 Mon Sep 17 00:00:00 2001 From: fmoga Date: Fri, 8 Jul 2011 01:34:33 +0000 Subject: Initial implementation of binding.websocket with functional sample (test). git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1144121 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/binding/websocket/runtime/JSONUtil.java | 123 +++++++++++++++++++++ .../runtime/WebsocketReferenceBindingProvider.java | 5 +- .../runtime/WebsocketReferenceInvoker.java | 42 ++++++- .../runtime/WebsocketServiceBindingProvider.java | 100 ++++++++++++++++- .../websocket/runtime/WebsocketServiceInvoker.java | 38 +++++-- .../binding/websocket/runtime/WebsocketStash.java | 44 -------- 6 files changed, 286 insertions(+), 66 deletions(-) create mode 100644 sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java delete mode 100644 sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketStash.java (limited to 'sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org') diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java new file mode 100644 index 0000000000..30a14dd8bb --- /dev/null +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java @@ -0,0 +1,123 @@ +package org.apache.tuscany.sca.binding.websocket.runtime; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.tuscany.sca.interfacedef.DataType; +import org.apache.tuscany.sca.interfacedef.Operation; + +import com.google.gson.Gson; + +/** + * Helper class to facilitate JSON convertions. + */ +public class JSONUtil { + + private static Gson gson = new Gson(); + + /** + * Convert request parameters from JSON to operation parameter types. + * + * @param jsonData + * parameters in JSON array format + * @param operation + * the operation to invoke + * @return an array of objects + */ + public static Object[] decodeJsonParamsForOperation(String jsonData, Operation operation) { + Object[] args = new Object[operation.getInputType().getLogical().size()]; + final String[] json = parseArray(jsonData); + int index = 0; + for (final DataType dataType : operation.getInputType().getLogical()) { + args[index] = gson.fromJson(json[index], dataType.getPhysical()); + index++; + } + return args; + } + + /** + * Split the JSON array containing the arguments for the method call in + * order to avoid converting JSON to Object[]. Converting each object + * separately to it's corresponding type avoids type mismatch problems at + * service invocation. + * + * @param jsonArray + * the JSON array + * @return an array of JSON formatted strings + */ + private static String[] parseArray(String jsonArray) { + List objects = new ArrayList(); + int bracketNum = 0; + int parNum = 0; + int startPos = 1; + for (int i = 0; i < jsonArray.length(); i++) { + switch (jsonArray.charAt(i)) { + case '{': + bracketNum++; + break; + case '}': + bracketNum--; + break; + case '[': + parNum++; + break; + case ']': + parNum--; + break; + case ',': + if ((bracketNum == 0) && (parNum == 1)) { + objects.add(jsonArray.substring(startPos, i)); + startPos = i + 1; + } + } + } + objects.add(jsonArray.substring(startPos, jsonArray.length() - 1)); + return objects.toArray(new String[] {}); + } + + private JSONUtil() { + } + + /** + * Converts a Java object to JSON format. + * + * @param response + * the response to convert + * @return the object in JSON format + */ + public static String encodeResponse(Object response) { + return gson.toJson(response); + } + + /** + * Convert request parameters as JSON array. + * + * @param params + * request parameters + * @return request parameters as JSON array + */ + public static String encodeRequestParams(Object[] params) { + StringBuilder builder = new StringBuilder(); + for (int index = 0; index < params.length; index++) { + Object param = params[index]; + builder.append(index == 0 ? "[" : ","); + builder.append(gson.toJson(param)); + } + builder.append("]"); + return builder.toString(); + } + + /** + * Decode JSON to a given Java type. + * + * @param responseJSON + * the json to convert + * @param returnType + * the return type to convert to + * @return the converted object + */ + public static Object decodeResponse(String responseJSON, Class returnType) { + return gson.fromJson(responseJSON, returnType); + } + +} diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java index 2766c3d991..4057e46956 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java @@ -28,12 +28,11 @@ import org.apache.tuscany.sca.provider.ReferenceBindingProvider; public class WebsocketReferenceBindingProvider implements ReferenceBindingProvider { private EndpointReference endpoint; - private InterfaceContract contract; public WebsocketReferenceBindingProvider(EndpointReference endpoint) { this.endpoint = endpoint; } - + public Invoker createInvoker(Operation operation) { return new WebsocketReferenceInvoker(operation, endpoint); } @@ -45,7 +44,7 @@ public class WebsocketReferenceBindingProvider implements ReferenceBindingProvid } public InterfaceContract getBindingInterfaceContract() { - return contract; + return endpoint.getComponentReferenceInterfaceContract(); } public boolean supportsOneWayInvocation() { diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java index a87c87dfb2..a9c2505ea8 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java @@ -19,13 +19,20 @@ package org.apache.tuscany.sca.binding.websocket.runtime; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.channels.SocketChannel; + import org.apache.tuscany.sca.assembly.EndpointReference; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.invocation.Invoker; import org.apache.tuscany.sca.invocation.Message; +import org.apache.websocket.WebSocket; +import org.apache.websocket.WebSocketConnector; public class WebsocketReferenceInvoker implements Invoker { - + protected Operation operation; protected EndpointReference endpoint; @@ -36,16 +43,41 @@ public class WebsocketReferenceInvoker implements Invoker { public Message invoke(Message msg) { try { - return doInvoke(msg); - } catch (Exception e) { throw new RuntimeException(e); } } public Message doInvoke(Message msg) { - WebsocketServiceInvoker fi = WebsocketStash.getService(endpoint.getBinding().getURI()); - return fi.invokeService(msg); + String componentName = endpoint.getTargetEndpoint().getComponent().getName(); + String serviceName = endpoint.getTargetEndpoint().getService().getName(); + String operationName = operation.getName(); + String uri = endpoint.getBinding().getURI() + "/" + componentName + "/" + serviceName + "/" + operationName; + String jsonParams = JSONUtil.encodeRequestParams((Object[]) msg.getBody()); + String responseJSON = invokeWebSocketRequest(uri, jsonParams); + Class returnType = operation.getOutputType().getLogical().get(0).getPhysical(); + Object response = JSONUtil.decodeResponse(responseJSON, returnType); + msg.setBody(response); + return msg; + } + + private String invokeWebSocketRequest(String uri, String jsonParams) { + try { + return doInvokeWebSocketRequest(uri, jsonParams); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private String doInvokeWebSocketRequest(String uri, String jsonParams) throws IOException, URISyntaxException { + WebSocketConnector connector = new WebSocketConnector(); + WebSocket websocket = connector.connect(new URI(uri), null, "apache-tuscany", null); + websocket.sendText(jsonParams); + String jsonResponse = websocket.receiveText(); + websocket.close(); + return jsonResponse; } } diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java index f5ed85a30a..efab714e5c 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java @@ -19,33 +19,125 @@ package org.apache.tuscany.sca.binding.websocket.runtime; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.channels.SocketChannel; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import org.apache.tuscany.sca.interfacedef.InterfaceContract; +import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.provider.ServiceBindingProvider; import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.websocket.ServerWebSocket; +import org.apache.websocket.WebSocket; +import org.apache.websocket.WebSocketApplication; +import org.apache.websocket.WebSocketException; public class WebsocketServiceBindingProvider implements ServiceBindingProvider { private RuntimeEndpoint endpoint; - private InterfaceContract contract; + private static ConcurrentMap websocketServers = new ConcurrentHashMap(); public WebsocketServiceBindingProvider(RuntimeEndpoint endpoint) { this.endpoint = endpoint; } public void start() { - WebsocketStash.addService(endpoint.getBinding().getURI(), new WebsocketServiceInvoker(endpoint)); + String uri = endpoint.getBinding().getURI(); + ServerWebSocket server = initWebSocketServerForURI(uri); + String component = endpoint.getComponent().getName(); + String service = endpoint.getService().getName(); + for (Operation op : getBindingInterfaceContract().getInterface().getOperations()) { + String operation = op.getName(); + server.register("/" + component + "/" + service + "/" + operation, new WebSocketEndpoint(endpoint, op)); + System.out.println("Registered websocket endpoint for /" + component + "/" + service + "/" + operation); + } + + } + + private ServerWebSocket initWebSocketServerForURI(String uri) { + ServerWebSocket server = websocketServers.get(uri); + if (server == null) { + try { + server = new ServerWebSocket(new URI(uri)); + websocketServers.put(uri, server); + System.out.println("Starting websocket server at " + uri + "..."); + new Thread(server).start(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + return server; } public void stop() { - WebsocketStash.removeService(endpoint.getBinding().getURI()); + if (websocketServers != null) { + for (ServerWebSocket server : websocketServers.values()) { + try { + server.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + websocketServers.clear(); + websocketServers = null; + } } public InterfaceContract getBindingInterfaceContract() { - return contract; + return endpoint.getService().getInterfaceContract(); } public boolean supportsOneWayInvocation() { return false; } + public class WebSocketEndpoint implements WebSocketApplication { + + private RuntimeEndpoint endpoint; + private Operation operation; + + public WebSocketEndpoint(RuntimeEndpoint endpoint, Operation operation) { + this.endpoint = endpoint; + this.operation = operation; + } + + @Override + public void onConnection(WebSocket socket) { + // handle request in a non-blocking fashion releasing the server + // thread + new Thread(new WebsocketServiceInvoker(endpoint, operation, socket)).start(); + } + + @Override + public void onHandshakeError(IOException e) { + System.out.println("Handshake error!\n"); + e.printStackTrace(); + } + + @Override + public String acceptProtocol(String protocol) { + // don't accept any subprotocols + return null; + } + + @Override + public boolean acceptOrigin(String origin) { + // accept all clients + return true; + } + + @Override + public Map acceptExtensions(Map headers) throws WebSocketException { + // don't accept any extensions + return null; + } + + } + } diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java index 410312a538..e7022e273a 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java @@ -19,22 +19,40 @@ package org.apache.tuscany.sca.binding.websocket.runtime; -import org.apache.tuscany.sca.invocation.Message; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.nio.channels.SocketChannel; + +import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.runtime.RuntimeEndpoint; +import org.apache.websocket.WebSocket; -public class WebsocketServiceInvoker { +public class WebsocketServiceInvoker implements Runnable { private RuntimeEndpoint wire; - - public WebsocketServiceInvoker(RuntimeEndpoint wire) { + private Operation operation; + private WebSocket websocket; + + public WebsocketServiceInvoker(RuntimeEndpoint wire, Operation operation, WebSocket websocket) { this.wire = wire; + this.operation = operation; + this.websocket = websocket; } - /** - * Send the request down the wire to invoke the service - */ - public Message invokeService(Message msg) { - return wire.invoke(msg); + @Override + public void run() { + try { + String jsonParams = websocket.receiveText(); + Object[] args = JSONUtil.decodeJsonParamsForOperation(jsonParams, operation); + Object response = wire.invoke(operation, args); + String jsonResponse = JSONUtil.encodeResponse(response); + websocket.sendText(jsonResponse); + websocket.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + throw new RuntimeException(e); + } } - + } diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketStash.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketStash.java deleted file mode 100644 index 0023cf915d..0000000000 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketStash.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tuscany.sca.binding.websocket.runtime; - -import java.util.HashMap; -import java.util.Map; - -/** - * Simplistic static Map to share service endpoints with references - */ -public class WebsocketStash { - - private static Map services = new HashMap(); - - public static void addService(String uri, WebsocketServiceInvoker WebsocketServiceInvoker) { - services.put(uri, WebsocketServiceInvoker); - } - - public static WebsocketServiceInvoker getService(String uri) { - return services.get(uri); - } - - public static void removeService(String uri) { - services.remove(uri); - } - -} -- cgit v1.2.3