diff options
Diffstat (limited to '')
7 files changed, 288 insertions, 70 deletions
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java new file mode 100644 index 0000000000..30a14dd8bb --- /dev/null +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/JSONUtil.java @@ -0,0 +1,123 @@ +package org.apache.tuscany.sca.binding.websocket.runtime; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.tuscany.sca.interfacedef.DataType; +import org.apache.tuscany.sca.interfacedef.Operation; + +import com.google.gson.Gson; + +/** + * Helper class to facilitate JSON convertions. + */ +public class JSONUtil { + + private static Gson gson = new Gson(); + + /** + * Convert request parameters from JSON to operation parameter types. + * + * @param jsonData + * parameters in JSON array format + * @param operation + * the operation to invoke + * @return an array of objects + */ + public static Object[] decodeJsonParamsForOperation(String jsonData, Operation operation) { + Object[] args = new Object[operation.getInputType().getLogical().size()]; + final String[] json = parseArray(jsonData); + int index = 0; + for (final DataType<?> dataType : operation.getInputType().getLogical()) { + args[index] = gson.fromJson(json[index], dataType.getPhysical()); + index++; + } + return args; + } + + /** + * Split the JSON array containing the arguments for the method call in + * order to avoid converting JSON to Object[]. Converting each object + * separately to it's corresponding type avoids type mismatch problems at + * service invocation. + * + * @param jsonArray + * the JSON array + * @return an array of JSON formatted strings + */ + private static String[] parseArray(String jsonArray) { + List<String> objects = new ArrayList<String>(); + int bracketNum = 0; + int parNum = 0; + int startPos = 1; + for (int i = 0; i < jsonArray.length(); i++) { + switch (jsonArray.charAt(i)) { + case '{': + bracketNum++; + break; + case '}': + bracketNum--; + break; + case '[': + parNum++; + break; + case ']': + parNum--; + break; + case ',': + if ((bracketNum == 0) && (parNum == 1)) { + objects.add(jsonArray.substring(startPos, i)); + startPos = i + 1; + } + } + } + objects.add(jsonArray.substring(startPos, jsonArray.length() - 1)); + return objects.toArray(new String[] {}); + } + + private JSONUtil() { + } + + /** + * Converts a Java object to JSON format. + * + * @param response + * the response to convert + * @return the object in JSON format + */ + public static String encodeResponse(Object response) { + return gson.toJson(response); + } + + /** + * Convert request parameters as JSON array. + * + * @param params + * request parameters + * @return request parameters as JSON array + */ + public static String encodeRequestParams(Object[] params) { + StringBuilder builder = new StringBuilder(); + for (int index = 0; index < params.length; index++) { + Object param = params[index]; + builder.append(index == 0 ? "[" : ","); + builder.append(gson.toJson(param)); + } + builder.append("]"); + return builder.toString(); + } + + /** + * Decode JSON to a given Java type. + * + * @param responseJSON + * the json to convert + * @param returnType + * the return type to convert to + * @return the converted object + */ + public static Object decodeResponse(String responseJSON, Class<?> returnType) { + return gson.fromJson(responseJSON, returnType); + } + +} diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java index 2766c3d991..4057e46956 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceBindingProvider.java @@ -28,12 +28,11 @@ import org.apache.tuscany.sca.provider.ReferenceBindingProvider; public class WebsocketReferenceBindingProvider implements ReferenceBindingProvider {
private EndpointReference endpoint;
- private InterfaceContract contract;
public WebsocketReferenceBindingProvider(EndpointReference endpoint) {
this.endpoint = endpoint;
}
-
+
public Invoker createInvoker(Operation operation) {
return new WebsocketReferenceInvoker(operation, endpoint);
}
@@ -45,7 +44,7 @@ public class WebsocketReferenceBindingProvider implements ReferenceBindingProvid }
public InterfaceContract getBindingInterfaceContract() {
- return contract;
+ return endpoint.getComponentReferenceInterfaceContract();
}
public boolean supportsOneWayInvocation() {
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java index a87c87dfb2..a9c2505ea8 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketReferenceInvoker.java @@ -19,13 +19,20 @@ package org.apache.tuscany.sca.binding.websocket.runtime;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.SocketChannel;
+
import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.Message;
+import org.apache.websocket.WebSocket;
+import org.apache.websocket.WebSocketConnector;
public class WebsocketReferenceInvoker implements Invoker {
-
+
protected Operation operation;
protected EndpointReference endpoint;
@@ -36,16 +43,41 @@ public class WebsocketReferenceInvoker implements Invoker { public Message invoke(Message msg) {
try {
-
return doInvoke(msg);
-
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public Message doInvoke(Message msg) {
- WebsocketServiceInvoker fi = WebsocketStash.getService(endpoint.getBinding().getURI());
- return fi.invokeService(msg);
+ String componentName = endpoint.getTargetEndpoint().getComponent().getName();
+ String serviceName = endpoint.getTargetEndpoint().getService().getName();
+ String operationName = operation.getName();
+ String uri = endpoint.getBinding().getURI() + "/" + componentName + "/" + serviceName + "/" + operationName;
+ String jsonParams = JSONUtil.encodeRequestParams((Object[]) msg.getBody());
+ String responseJSON = invokeWebSocketRequest(uri, jsonParams);
+ Class<?> returnType = operation.getOutputType().getLogical().get(0).getPhysical();
+ Object response = JSONUtil.decodeResponse(responseJSON, returnType);
+ msg.setBody(response);
+ return msg;
+ }
+
+ private String invokeWebSocketRequest(String uri, String jsonParams) {
+ try {
+ return doInvokeWebSocketRequest(uri, jsonParams);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String doInvokeWebSocketRequest(String uri, String jsonParams) throws IOException, URISyntaxException {
+ WebSocketConnector connector = new WebSocketConnector();
+ WebSocket<SocketChannel> websocket = connector.connect(new URI(uri), null, "apache-tuscany", null);
+ websocket.sendText(jsonParams);
+ String jsonResponse = websocket.receiveText();
+ websocket.close();
+ return jsonResponse;
}
}
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java index f5ed85a30a..efab714e5c 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java @@ -19,33 +19,125 @@ package org.apache.tuscany.sca.binding.websocket.runtime;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
+import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.apache.websocket.ServerWebSocket;
+import org.apache.websocket.WebSocket;
+import org.apache.websocket.WebSocketApplication;
+import org.apache.websocket.WebSocketException;
public class WebsocketServiceBindingProvider implements ServiceBindingProvider {
private RuntimeEndpoint endpoint;
- private InterfaceContract contract;
+ private static ConcurrentMap<String, ServerWebSocket> websocketServers = new ConcurrentHashMap<String, ServerWebSocket>();
public WebsocketServiceBindingProvider(RuntimeEndpoint endpoint) {
this.endpoint = endpoint;
}
public void start() {
- WebsocketStash.addService(endpoint.getBinding().getURI(), new WebsocketServiceInvoker(endpoint));
+ String uri = endpoint.getBinding().getURI();
+ ServerWebSocket server = initWebSocketServerForURI(uri);
+ String component = endpoint.getComponent().getName();
+ String service = endpoint.getService().getName();
+ for (Operation op : getBindingInterfaceContract().getInterface().getOperations()) {
+ String operation = op.getName();
+ server.register("/" + component + "/" + service + "/" + operation, new WebSocketEndpoint(endpoint, op));
+ System.out.println("Registered websocket endpoint for /" + component + "/" + service + "/" + operation);
+ }
+
+ }
+
+ private ServerWebSocket initWebSocketServerForURI(String uri) {
+ ServerWebSocket server = websocketServers.get(uri);
+ if (server == null) {
+ try {
+ server = new ServerWebSocket(new URI(uri));
+ websocketServers.put(uri, server);
+ System.out.println("Starting websocket server at " + uri + "...");
+ new Thread(server).start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return server;
}
public void stop() {
- WebsocketStash.removeService(endpoint.getBinding().getURI());
+ if (websocketServers != null) {
+ for (ServerWebSocket server : websocketServers.values()) {
+ try {
+ server.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ websocketServers.clear();
+ websocketServers = null;
+ }
}
public InterfaceContract getBindingInterfaceContract() {
- return contract;
+ return endpoint.getService().getInterfaceContract();
}
public boolean supportsOneWayInvocation() {
return false;
}
+ public class WebSocketEndpoint implements WebSocketApplication<SocketChannel> {
+
+ private RuntimeEndpoint endpoint;
+ private Operation operation;
+
+ public WebSocketEndpoint(RuntimeEndpoint endpoint, Operation operation) {
+ this.endpoint = endpoint;
+ this.operation = operation;
+ }
+
+ @Override
+ public void onConnection(WebSocket<SocketChannel> socket) {
+ // handle request in a non-blocking fashion releasing the server
+ // thread
+ new Thread(new WebsocketServiceInvoker(endpoint, operation, socket)).start();
+ }
+
+ @Override
+ public void onHandshakeError(IOException e) {
+ System.out.println("Handshake error!\n");
+ e.printStackTrace();
+ }
+
+ @Override
+ public String acceptProtocol(String protocol) {
+ // don't accept any subprotocols
+ return null;
+ }
+
+ @Override
+ public boolean acceptOrigin(String origin) {
+ // accept all clients
+ return true;
+ }
+
+ @Override
+ public Map<String, String> acceptExtensions(Map<String, String> headers) throws WebSocketException {
+ // don't accept any extensions
+ return null;
+ }
+
+ }
+
}
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java index 410312a538..e7022e273a 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceInvoker.java @@ -19,22 +19,40 @@ package org.apache.tuscany.sca.binding.websocket.runtime;
-import org.apache.tuscany.sca.invocation.Message;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.channels.SocketChannel;
+
+import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.apache.websocket.WebSocket;
-public class WebsocketServiceInvoker {
+public class WebsocketServiceInvoker implements Runnable {
private RuntimeEndpoint wire;
-
- public WebsocketServiceInvoker(RuntimeEndpoint wire) {
+ private Operation operation;
+ private WebSocket<SocketChannel> websocket;
+
+ public WebsocketServiceInvoker(RuntimeEndpoint wire, Operation operation, WebSocket<SocketChannel> websocket) {
this.wire = wire;
+ this.operation = operation;
+ this.websocket = websocket;
}
- /**
- * Send the request down the wire to invoke the service
- */
- public Message invokeService(Message msg) {
- return wire.invoke(msg);
+ @Override
+ public void run() {
+ try {
+ String jsonParams = websocket.receiveText();
+ Object[] args = JSONUtil.decodeJsonParamsForOperation(jsonParams, operation);
+ Object response = wire.invoke(operation, args);
+ String jsonResponse = JSONUtil.encodeResponse(response);
+ websocket.sendText(jsonResponse);
+ websocket.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
}
-
+
}
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketStash.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketStash.java deleted file mode 100644 index 0023cf915d..0000000000 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketStash.java +++ /dev/null @@ -1,44 +0,0 @@ -/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tuscany.sca.binding.websocket.runtime;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Simplistic static Map to share service endpoints with references
- */
-public class WebsocketStash {
-
- private static Map<String, WebsocketServiceInvoker> services = new HashMap<String, WebsocketServiceInvoker>();
-
- public static void addService(String uri, WebsocketServiceInvoker WebsocketServiceInvoker) {
- services.put(uri, WebsocketServiceInvoker);
- }
-
- public static WebsocketServiceInvoker getService(String uri) {
- return services.get(uri);
- }
-
- public static void removeService(String uri) {
- services.remove(uri);
- }
-
-}
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/test/resources/helloworld.composite b/sca-java-2.x/contrib/modules/binding-websocket/src/test/resources/helloworld.composite index 6ce4f87334..a3cbf5084c 100644 --- a/sca-java-2.x/contrib/modules/binding-websocket/src/test/resources/helloworld.composite +++ b/sca-java-2.x/contrib/modules/binding-websocket/src/test/resources/helloworld.composite @@ -25,15 +25,13 @@ <component name="HelloWorldComponent">
<implementation.java class="helloworld.HelloWorldImpl"/>
<service name="HelloWorldService" >
- <tuscany:binding.websocket />
+ <tuscany:binding.websocket uri="ws://127.0.0.1:5555" />
</service>
</component>
<component name="HelloWorldClient">
<implementation.java class="helloworld.HelloWorldClient"/>
- <reference name="ref" >
- <tuscany:binding.websocket uri="HelloWorldComponent/HelloWorldService"/>
- </reference>
+ <reference name="ref" target="HelloWorldComponent/HelloWorldService" />
</component>
</composite>
|