summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java
diff options
context:
space:
mode:
Diffstat (limited to 'sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java')
-rw-r--r--sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java87
1 files changed, 18 insertions, 69 deletions
diff --git a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java
index 461b364b2f..fe5efcffef 100644
--- a/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java
+++ b/sca-java-2.x/contrib/modules/binding-websocket/src/main/java/org/apache/tuscany/sca/binding/websocket/runtime/WebsocketServiceBindingProvider.java
@@ -22,24 +22,20 @@ package org.apache.tuscany.sca.binding.websocket.runtime;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.nio.channels.SocketChannel;
+import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.tuscany.sca.interfacedef.InterfaceContract;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.provider.ServiceBindingProvider;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.websocket.ServerWebSocket;
-import org.apache.websocket.WebSocket;
-import org.apache.websocket.WebSocketApplication;
-import org.apache.websocket.WebSocketException;
public class WebsocketServiceBindingProvider implements ServiceBindingProvider {
+ private static Map<String, WebSocketOperationDispatcher> dispatchers = new HashMap<String, WebSocketOperationDispatcher>();
+
private RuntimeEndpoint endpoint;
- private static ConcurrentMap<String, ServerWebSocket> websocketServers = new ConcurrentHashMap<String, ServerWebSocket>();
public WebsocketServiceBindingProvider(RuntimeEndpoint endpoint) {
this.endpoint = endpoint;
@@ -47,24 +43,26 @@ public class WebsocketServiceBindingProvider implements ServiceBindingProvider {
public void start() {
String uri = endpoint.getBinding().getURI();
- ServerWebSocket server = initWebSocketServerForURI(uri);
+ WebSocketOperationDispatcher dispatcher = initDispatcherForURI(uri);
String component = endpoint.getComponent().getName();
String service = endpoint.getService().getName();
for (Operation op : getBindingInterfaceContract().getInterface().getOperations()) {
String operation = op.getName();
- server.register("/" + component + "/" + service + "/" + operation, new WebSocketEndpoint(endpoint, op));
- System.out.println("Registered websocket endpoint for /" + component + "/" + service + "/" + operation);
+ dispatcher.addOperation(component + "/" + service + "/" + operation, endpoint, op);
}
}
- private ServerWebSocket initWebSocketServerForURI(String uri) {
- ServerWebSocket server = websocketServers.get(uri);
- if (server == null) {
+ private WebSocketOperationDispatcher initDispatcherForURI(String uri) {
+ WebSocketOperationDispatcher dispatcher = dispatchers.get(uri);
+ if (dispatcher == null) {
try {
- server = new ServerWebSocket(new URI(uri));
- websocketServers.put(uri, server);
- System.out.println("Starting websocket server at " + uri + "...");
+ ServerWebSocket server = new ServerWebSocket(new URI(uri));
+ System.out.println("Starting websocket server " + server + " at " + uri + "...");
+ dispatcher = new WebSocketOperationDispatcher(server);
+ System.out.println("Created new dispatcher for " + uri + " " + dispatcher);
+ dispatchers.put(uri, dispatcher);
+ server.register("/", dispatcher);
new Thread(server).start();
} catch (IOException e) {
throw new RuntimeException(e);
@@ -72,20 +70,14 @@ public class WebsocketServiceBindingProvider implements ServiceBindingProvider {
throw new RuntimeException(e);
}
}
- return server;
+ return dispatcher;
}
public void stop() {
- if (websocketServers != null) {
- for (ServerWebSocket server : websocketServers.values()) {
- try {
- server.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- websocketServers.clear();
+ for (WebSocketOperationDispatcher dispatcher : dispatchers.values()) {
+ dispatcher.shutdown();
}
+ dispatchers.clear();
}
public InterfaceContract getBindingInterfaceContract() {
@@ -96,47 +88,4 @@ public class WebsocketServiceBindingProvider implements ServiceBindingProvider {
return false;
}
- public class WebSocketEndpoint implements WebSocketApplication<SocketChannel> {
-
- private RuntimeEndpoint endpoint;
- private Operation operation;
-
- public WebSocketEndpoint(RuntimeEndpoint endpoint, Operation operation) {
- this.endpoint = endpoint;
- this.operation = operation;
- }
-
- @Override
- public void onConnection(WebSocket<SocketChannel> socket) {
- // handle request in a non-blocking fashion releasing the server
- // thread
- new Thread(new WebsocketServiceInvoker(endpoint, operation, socket)).start();
- }
-
- @Override
- public void onHandshakeError(IOException e) {
- System.out.println("Handshake error!\n");
- e.printStackTrace();
- }
-
- @Override
- public String acceptProtocol(String protocol) {
- // don't accept any subprotocols
- return null;
- }
-
- @Override
- public boolean acceptOrigin(String origin) {
- // accept all clients
- return true;
- }
-
- @Override
- public Map<String, String> acceptExtensions(Map<String, String> headers) throws WebSocketException {
- // don't accept any extensions
- return null;
- }
-
- }
-
}