diff options
Diffstat (limited to 'tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl')
11 files changed, 1897 insertions, 0 deletions
diff --git a/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java new file mode 100644 index 0000000000..aeccfa8b05 --- /dev/null +++ b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncJDKInvocationHandler.java @@ -0,0 +1,93 @@ +/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.core.invocation.impl;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.concurrent.Future;
+
+import javax.xml.ws.AsyncHandler;
+import javax.xml.ws.Response;
+
+import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.oasisopen.sca.ServiceReference;
+
+public class AsyncJDKInvocationHandler extends JDKInvocationHandler {
+ private static final long serialVersionUID = 1L;
+
+ public AsyncJDKInvocationHandler(MessageFactory messageFactory, ServiceReference<?> callableReference) {
+ super(messageFactory, callableReference);
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ if (isAsyncCallback(method)) {
+ return doInvokeAsyncCallback(proxy, method, args);
+ } else if (isAsyncPoll(method)) {
+ return doInvokeAsyncPoll(proxy, method, args);
+ } else {
+ return super.invoke(proxy, method, args);
+ }
+ }
+
+ protected boolean isAsyncCallback(Method method) {
+ if (method.getName().endsWith("Async") && (method.getReturnType().isAssignableFrom(Future.class))) {
+ if (method.getParameterTypes().length > 0) {
+ return method.getParameterTypes()[method.getParameterTypes().length-1].isAssignableFrom(AsyncHandler.class);
+ }
+ }
+ return false;
+ }
+
+ protected boolean isAsyncPoll(Method method) {
+ return method.getName().endsWith("Async") && (method.getReturnType().isAssignableFrom(Response.class));
+ }
+
+ protected AsyncResponse doInvokeAsyncPoll(Object proxy, Method asyncMethod, Object[] args) {
+ Object response;
+ boolean isException;
+ try {
+ response = super.invoke(proxy, getNonAsyncMethod(asyncMethod), args);
+ isException = false;
+ } catch (Throwable e) {
+ response = e;
+ isException = true;
+ }
+ return new AsyncResponse(response, isException);
+ }
+
+ private Object doInvokeAsyncCallback(Object proxy, Method asyncMethod, Object[] args) {
+ AsyncHandler handler = (AsyncHandler)args[args.length-1];
+ Response response = doInvokeAsyncPoll(proxy,asyncMethod,Arrays.copyOf(args, args.length-1));
+ handler.handleResponse(response);
+
+ return null;
+ }
+
+ protected Method getNonAsyncMethod(Method asyncMethod) {
+ String methodName = asyncMethod.getName().substring(0, asyncMethod.getName().length()-5);
+ for (Method m : businessInterface.getMethods()) {
+ if (methodName.equals(m.getName())) {
+ return m;
+ }
+ }
+ throw new IllegalStateException("No non-async method matching async method " + asyncMethod.getName());
+ }
+}
diff --git a/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponse.java b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponse.java new file mode 100644 index 0000000000..07086dfae3 --- /dev/null +++ b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponse.java @@ -0,0 +1,68 @@ +/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tuscany.sca.core.invocation.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.xml.ws.Response;
+
+public class AsyncResponse implements Response {
+
+ private Object response;
+ private boolean isException;
+
+ public AsyncResponse(Object response, boolean isException) {
+ this.response = response;
+ this.isException = isException;
+ }
+
+ public Map getContext() {
+ return new HashMap();
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ public Object get() throws InterruptedException, ExecutionException {
+ if (isException) {
+ throw new ExecutionException((Throwable)response);
+ } else {
+ return response;
+ }
+ }
+
+ public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return get();
+ }
+
+ public boolean isCancelled() {
+ return false;
+ }
+
+ public boolean isDone() {
+ return true;
+ }
+
+}
diff --git a/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java new file mode 100644 index 0000000000..60914f799b --- /dev/null +++ b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/InvocationChainImpl.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.DataExchangeSemantics; +import org.apache.tuscany.sca.invocation.Interceptor; +import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Phase; +import org.apache.tuscany.sca.invocation.PhasedInterceptor; + +/** + * Default implementation of an invocation chain + * + * @version $Rev$ $Date$ + */ +public class InvocationChainImpl implements InvocationChain { + private Operation sourceOperation; + private Operation targetOperation; + private List<Node> nodes = new ArrayList<Node>(); + + private final PhaseManager phaseManager; + private boolean forReference; + private boolean allowsPassByReference; + + public InvocationChainImpl(Operation sourceOperation, Operation targetOperation, boolean forReference, PhaseManager phaseManager) { + this.targetOperation = targetOperation; + this.sourceOperation = sourceOperation; + this.forReference = forReference; + this.phaseManager = phaseManager; + } + + public Operation getTargetOperation() { + return targetOperation; + } + + public void setTargetOperation(Operation operation) { + this.targetOperation = operation; + } + + public void addInterceptor(Interceptor interceptor) { + if (interceptor instanceof PhasedInterceptor) { + PhasedInterceptor pi = (PhasedInterceptor)interceptor; + if (pi.getPhase() != null) { + addInvoker(pi.getPhase(), pi); + return; + } + } + String phase = forReference ? Phase.REFERENCE : Phase.SERVICE; + addInterceptor(phase, interceptor); + } + + public void addInvoker(Invoker invoker) { + if (invoker instanceof PhasedInterceptor) { + PhasedInterceptor pi = (PhasedInterceptor)invoker; + if (pi.getPhase() != null) { + addInvoker(pi.getPhase(), pi); + return; + } + } + String phase = forReference ? Phase.REFERENCE_BINDING : Phase.IMPLEMENTATION; + addInvoker(phase, invoker); + } + + public Invoker getHeadInvoker() { + return nodes.isEmpty() ? null : nodes.get(0).getInvoker(); + } + + /** + * @return the sourceOperation + */ + public Operation getSourceOperation() { + return sourceOperation; + } + + /** + * @param sourceOperation the sourceOperation to set + */ + public void setSourceOperation(Operation sourceOperation) { + this.sourceOperation = sourceOperation; + } + + public void addInterceptor(String phase, Interceptor interceptor) { + addInvoker(phase, interceptor); + } + + private void addInvoker(String phase, Invoker invoker) { + int index = phaseManager.getAllPhases().indexOf(phase); + if (index == -1) { + throw new IllegalArgumentException("Invalid phase name: " + phase); + } + Node node = new Node(index, invoker); + ListIterator<Node> li = nodes.listIterator(); + Node before = null, after = null; + boolean found = false; + while (li.hasNext()) { + before = after; + after = li.next(); + if (after.getPhaseIndex() > index) { + // Move back + li.previous(); + li.add(node); + found = true; + break; + } + } + if (!found) { + // Add to the end + nodes.add(node); + before = after; + after = null; + } + + // Relink the interceptors + if (before != null) { + if (before.getInvoker() instanceof Interceptor) { + ((Interceptor)before.getInvoker()).setNext(invoker); + } + } + if (after != null) { + if (invoker instanceof Interceptor) { + ((Interceptor)invoker).setNext(after.getInvoker()); + } + } + + } + + public boolean allowsPassByReference() { + if (allowsPassByReference) { + // No need to check the invokers + return true; + } + // Check if any of the invokers allows pass-by-reference + boolean allowsPBR = false; + for (Node i : nodes) { + if (i.getInvoker() instanceof DataExchangeSemantics) { + if (((DataExchangeSemantics)i.getInvoker()).allowsPassByReference()) { + allowsPBR = true; + break; + } + } + } + return allowsPBR; + } + + public void setAllowsPassByReference(boolean allowsPBR) { + this.allowsPassByReference = allowsPBR; + } + + private static class Node { + private int phaseIndex; + private Invoker invoker; + + public Node(int phaseIndex, Invoker invoker) { + super(); + this.phaseIndex = phaseIndex; + this.invoker = invoker; + } + + public int getPhaseIndex() { + return phaseIndex; + } + + public Invoker getInvoker() { + return invoker; + } + + @Override + public String toString() { + return "(" + phaseIndex + ")" + invoker; + } + } + +} diff --git a/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java new file mode 100644 index 0000000000..e64b9f3068 --- /dev/null +++ b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + + +import org.apache.tuscany.sca.core.assembly.impl.RuntimeWireImpl; +import org.apache.tuscany.sca.core.context.impl.CallbackServiceReferenceImpl; +import org.apache.tuscany.sca.core.context.impl.ServiceReferenceImpl; +import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.RuntimeWire; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * Responsible for dispatching to a callback through a wire. <p/> TODO cache + * target invoker + * + * @version $Rev$ $Date$ + */ +public class JDKCallbackInvocationHandler extends JDKInvocationHandler { + private static final long serialVersionUID = -3350283555825935609L; + + public JDKCallbackInvocationHandler(MessageFactory messageFactory, ServiceReferenceImpl ref) { + super(messageFactory, ref); + this.fixedWire = false; + } + + @Override + @SuppressWarnings( {"unchecked"}) + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + + if (Object.class == method.getDeclaringClass()) { + return invokeObjectMethod(method, args); + } + + // obtain a dedicated wire to be used for this callback invocation + RuntimeWire wire = ((CallbackServiceReferenceImpl)callableReference).getCallbackWire(); + if (wire == null) { + //FIXME: need better exception + throw new ServiceRuntimeException("No callback wire found"); + } + + setEndpoint(((CallbackServiceReferenceImpl)callableReference).getResolvedEndpoint()); + + InvocationChain chain = getInvocationChain(method, wire); + if (chain == null) { + throw new IllegalArgumentException("No matching operation is found: " + method); + } + + try { + return invoke(chain, args, wire, wire.getEndpointReference()); + } catch (InvocationTargetException e) { + Throwable t = e.getCause(); + throw e; + } finally { + // allow the cloned wire to be reused by subsequent callbacks + ((RuntimeWireImpl)wire).releaseWire(); + } + } + +} diff --git a/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java new file mode 100644 index 0000000000..148749922e --- /dev/null +++ b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKInvocationHandler.java @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import java.io.Serializable; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.context.ThreadMessageContext; +import org.apache.tuscany.sca.core.context.ServiceReferenceExt; +import org.apache.tuscany.sca.core.factory.InstanceWrapper; +import org.apache.tuscany.sca.core.scope.TargetResolutionException; +import org.apache.tuscany.sca.interfacedef.DataType; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.interfacedef.java.JavaOperation; +import org.apache.tuscany.sca.invocation.InvocationChain; +import org.apache.tuscany.sca.invocation.Invoker; +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.RuntimeWire; +import org.oasisopen.sca.SCARuntimeException; +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * @version $Rev$ $Date$ + */ +public class JDKInvocationHandler implements InvocationHandler, Serializable { + private static final long serialVersionUID = -3366410500152201371L; + + protected boolean conversational; + protected MessageFactory messageFactory; + protected EndpointReference source; + protected Endpoint target; + protected RuntimeWire wire; + protected ServiceReferenceExt<?> callableReference; + protected Class<?> businessInterface; + + protected boolean fixedWire = true; + + protected transient Map<Method, InvocationChain> chains = new HashMap<Method, InvocationChain>(); + + public JDKInvocationHandler(MessageFactory messageFactory, Class<?> businessInterface, RuntimeWire wire) { + this.messageFactory = messageFactory; + this.wire = wire; + this.businessInterface = businessInterface; + init(this.wire); + } + + public JDKInvocationHandler(MessageFactory messageFactory, ServiceReference<?> callableReference) { + this.messageFactory = messageFactory; + this.callableReference = (ServiceReferenceExt<?>)callableReference; + if (callableReference != null) { + this.businessInterface = callableReference.getBusinessInterface(); + this.wire = ((ServiceReferenceExt<?>)callableReference).getRuntimeWire(); + if (wire != null) { + init(wire); + } + } + } + + protected void init(RuntimeWire wire) { + // TODO - EPR not required for OASIS + /* + if (wire != null) { + try { + // Clone the endpoint reference so that reference parameters can be changed + source = (EndpointReference)wire.getSource().clone(); + } catch (CloneNotSupportedException e) { + throw new ServiceRuntimeException(e); + } + initConversational(wire); + } + */ + } + + /* TODO - EPR - not required for OASIS + protected void initConversational(RuntimeWire wire) { + InterfaceContract contract = wire.getSource().getInterfaceContract(); + this.conversational = contract.getInterface().isConversational(); + } + */ + + public Class<?> getBusinessInterface() { + return businessInterface; + } + + + protected Object getCallbackID() { +// if (callableReference != null) { +// return callableReference.getCallbackID(); +// } else { + return null; +// } + } + + /* TODO - EPR - Not reqiured for OASIS + protected Object getCallbackObject() { + if (callableReference != null && callableReference instanceof ServiceReference) { + return ((ServiceReference)callableReference).getService(); + } else { + return null; + } + } + */ + + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (Object.class == method.getDeclaringClass()) { + return invokeObjectMethod(method, args); + } + if (wire == null) { + throw new ServiceRuntimeException("No runtime wire is available"); + } + + if (wire.isOutOfDate()) { + wire.rebuild(); + chains.clear(); + } + + InvocationChain chain = getInvocationChain(method, wire); + + if (chain == null) { + throw new IllegalArgumentException("No matching operation is found: " + method); + } + + // The EndpointReference is not now resolved until the invocation chain + // is first created so reset the source here + source = wire.getEndpointReference(); + + // send the invocation down the wire + Object result = invoke(chain, args, wire, source); + + return result; + } + + /** + * Handle the methods on the Object.class + * @param method + * @param args + */ + protected Object invokeObjectMethod(Method method, Object[] args) throws Throwable { + String name = method.getName(); + if ("toString".equals(name)) { + return "[Proxy - " + toString() + "]"; + } else if ("equals".equals(name)) { + Object obj = args[0]; + if (obj == null) { + return false; + } + if (!Proxy.isProxyClass(obj.getClass())) { + return false; + } + return equals(Proxy.getInvocationHandler(obj)); + } else if ("hashCode".equals(name)) { + return hashCode(); + } else { + return method.invoke(this); + } + } + + /** + * Determines if the given operation matches the given method + * + * @return true if the operation matches, false if does not + */ + // FIXME: Should it be in the InterfaceContractMapper? + @SuppressWarnings("unchecked") + private static boolean match(Operation operation, Method method) { + if (operation instanceof JavaOperation) { + JavaOperation javaOp = (JavaOperation)operation; + Method m = javaOp.getJavaMethod(); + if (!method.getName().equals(m.getName())) { + return false; + } + if (method.equals(m)) { + return true; + } + } else { + if (!method.getName().equals(operation.getName())) { + return false; + } + } + + // For remotable interface, operation is not overloaded. + if (operation.getInterface().isRemotable()) { + return true; + } + + Class<?>[] params = method.getParameterTypes(); + + DataType<List<DataType>> inputType = null; + if (operation.isWrapperStyle()) { + inputType = operation.getWrapper().getUnwrappedInputType(); + } else { + inputType = operation.getInputType(); + } + List<DataType> types = inputType.getLogical(); + boolean matched = true; + if (types.size() == params.length && method.getName().equals(operation.getName())) { + for (int i = 0; i < params.length; i++) { + Class<?> clazz = params[i]; + Class<?> type = types.get(i).getPhysical(); + // Object.class.isAssignableFrom(int.class) returns false + if (type != Object.class && (!type.isAssignableFrom(clazz))) { + matched = false; + } + } + } else { + matched = false; + } + return matched; + + } + + protected synchronized InvocationChain getInvocationChain(Method method, RuntimeWire wire) { + if (fixedWire && chains.containsKey(method)) { + return chains.get(method); + } + InvocationChain found = null; + for (InvocationChain chain : wire.getInvocationChains()) { + Operation operation = chain.getSourceOperation(); + if (operation.isDynamic()) { + operation.setName(method.getName()); + found = chain; + break; + } else if (match(operation, method)) { + found = chain; + break; + } + } + if (fixedWire) { + chains.put(method, found); + } + return found; + } + + protected void setEndpoint(Endpoint endpoint) { + this.target = endpoint; + } + + protected Object invoke(InvocationChain chain, Object[] args, RuntimeWire wire, EndpointReference source) + throws Throwable { + Message msg = messageFactory.createMessage(); + msg.setFrom(source); + if (target != null) { + msg.setTo(target); + } else { + msg.setTo(wire.getEndpoint()); + } + Invoker headInvoker = chain.getHeadInvoker(); + Operation operation = chain.getTargetOperation(); + msg.setOperation(operation); + msg.setBody(args); + + Message msgContext = ThreadMessageContext.getMessageContext(); + + // TODO - EPR - not required for OASIS + //Object currentConversationID = msgContext.getFrom().getReferenceParameters().getConversationID(); + //conversationPreinvoke(msg, wire); + + handleCallback(msg, wire); + ThreadMessageContext.setMessageContext(msg); + boolean abnormalEndConversation = false; + try { + // dispatch the wire down the chain and get the response + Message resp = headInvoker.invoke(msg); + Object body = resp.getBody(); + if (resp.isFault()) { + /* TODO - EPR - not required in OASIS + // mark the conversation as ended if the exception is not a business exception + if (currentConversationID != null ){ + try { + boolean businessException = false; + + for (DataType dataType : operation.getFaultTypes()){ + if (dataType.getPhysical() == ((Throwable)body).getClass()){ + businessException = true; + break; + } + } + + if (businessException == false){ + abnormalEndConversation = true; + } + } catch (Exception ex){ + // TODO - sure what the best course of action is here. We have + // a system exception in the middle of a business exception + } + } + */ + throw (Throwable)body; + } + return body; + } finally { + //conversationPostInvoke(msg, wire, abnormalEndConversation); + ThreadMessageContext.setMessageContext(msgContext); + } + } + + /** + * @param msg + * @param wire + * @param interfaze + * @throws TargetResolutionException + */ + private void handleCallback(Message msg, RuntimeWire wire) + throws TargetResolutionException { + + //ReferenceParameters parameters = msg.getFrom().getReferenceParameters(); + //parameters.setCallbackID(getCallbackID()); + + if (msg.getFrom() == null || msg.getFrom().getCallbackEndpoint() == null) { + return; + } + + /* TODO - EPR - not required for OASIS + parameters.setCallbackReference(msg.getFrom().getCallbackEndpoint()); + + // If we are passing out a callback target + // register the calling component instance against this + // new conversation id so that stateful callbacks will be + // able to find it + Object callbackObject = getCallbackObject(); + if (conversational && callbackObject == null) { + // the component instance is already registered + // so add another registration + ScopeContainer<Object> scopeContainer = getConversationalScopeContainer(wire); + + if (scopeContainer != null && currentConversationID != null) { + scopeContainer.addWrapperReference(currentConversationID, conversation.getConversationID()); + } + } + + Interface interfaze = msg.getFrom().getCallbackEndpoint().getInterfaceContract().getInterface(); + if (callbackObject != null) { + if (callbackObject instanceof ServiceReference) { + CallableReferenceExt<?> callableReference = (CallableReferenceExt<?>)callbackObject; + EndpointReference callbackRef = callableReference.getEndpointReference(); + + // TODO - EPR - create chains on the callback reference in case this hasn't already happened + // needed as the bindings are not now matched until the chanins are created + callableReference.getRuntimeWire().getInvocationChains(); + + parameters.setCallbackReference(callbackRef); + } else { + if (interfaze != null) { + if (!interfaze.isConversational()) { + throw new IllegalArgumentException( + "Callback object for stateless callback is not a ServiceReference"); + } else { + if (!(callbackObject instanceof Serializable)) { + throw new IllegalArgumentException( + "Callback object for stateful callback is not Serializable"); + } + ScopeContainer scopeContainer = getConversationalScopeContainer(wire); + if (scopeContainer != null) { + InstanceWrapper wrapper = new CallbackObjectWrapper(callbackObject); + scopeContainer.registerWrapper(wrapper, conversation.getConversationID()); + } + parameters.setCallbackObjectID(callbackObject); + } + } + } + } + */ + } + + /** + * Pre-invoke for the conversation handling + * @param msg + * @throws TargetResolutionException + */ + /* TODO - EPR - not required for OASIS + private void conversationPreinvoke(Message msg, RuntimeWire wire) { + if (!conversational) { + // Not conversational or the conversation has been started + return; + } + + ConversationManager conversationManager = ((RuntimeWireImpl2)wire).getConversationManager(); + + if (conversation == null || conversation.getState() == ConversationState.ENDED) { + + conversation = conversationManager.startConversation(getConversationID()); + + // if this is a local wire then set up the conversation timeouts here based on the + // parameters from the component + if (wire.getEndpoint().getComponent() != null){ + conversation.initializeConversationAttributes((RuntimeComponent)wire.getEndpoint().getComponent()); + } + + // connect the conversation to the CallableReference so it can be retrieve in the future + if (callableReference != null) { + ((CallableReferenceImpl)callableReference).attachConversation(conversation); + } + } else if (conversation.isExpired()) { + throw new ConversationEndedException("Conversation " + conversation.getConversationID() + " has expired."); + } + + // if this is a local wire then schedule conversation timeouts based on the timeout + // parameters from the service implementation. If this isn't a local wire then + // the RuntimeWireInvoker will take care of this + if (wire.getEndpoint().getComponent() != null){ + conversation.updateLastReferencedTime(); + } + + msg.getFrom().getReferenceParameters().setConversationID(conversation.getConversationID()); + + } + */ + + /** + * Post-invoke for the conversation handling + * @param wire + * @param operation + * @throws TargetDestructionException + */ + /* TODO - not required for OASIS + @SuppressWarnings("unchecked") + private void conversationPostInvoke(Message msg, RuntimeWire wire, boolean abnormalEndConversation) + throws TargetDestructionException { + Operation operation = msg.getOperation(); + ConversationSequence sequence = operation.getConversationSequence(); + // We check that conversation has not already ended as there is only one + // conversation manager in the runtime and so, in the case of remote bindings, + // the conversation will already have been stopped when we get back to the client + if ((sequence == ConversationSequence.CONVERSATION_END || abnormalEndConversation) && + (conversation.getState() != ConversationState.ENDED)) { + + // remove conversation id from scope container + ScopeContainer scopeContainer = getConversationalScopeContainer(wire); + + if (scopeContainer != null) { + scopeContainer.remove(conversation.getConversationID()); + } + + conversation.end(); + } + } + + + private ScopeContainer<Object> getConversationalScopeContainer(RuntimeWire wire) { + ScopeContainer<Object> scopeContainer = null; + + RuntimeComponent runtimeComponent = (RuntimeComponent)wire.getEndpointReference().getComponent(); + + if (runtimeComponent instanceof ScopedRuntimeComponent) { + ScopedRuntimeComponent scopedRuntimeComponent = (ScopedRuntimeComponent)runtimeComponent; + ScopeContainer<Object> tmpScopeContainer = scopedRuntimeComponent.getScopeContainer(); + + if ((tmpScopeContainer != null) && (tmpScopeContainer.getScope() == Scope.CONVERSATION)) { + scopeContainer = tmpScopeContainer; + } + } + + return scopeContainer; + } + */ + + /** + * Creates a new conversation id + * + * @return the conversation id + */ + /* TODO - EPR - not required for OASIS + private Object createConversationID() { + if (getConversationID() != null) { + return getConversationID(); + } else { + return UUID.randomUUID().toString(); + } + } + */ + + /** + * @return the callableReference + */ + public ServiceReference<?> getCallableReference() { + return callableReference; + } + + /** + * @param callableReference the callableReference to set + */ + public void setCallableReference(ServiceReference<?> callableReference) { + this.callableReference = (ServiceReferenceExt<?>)callableReference; + } + + /** + * Minimal wrapper for a callback object contained in a ServiceReference + */ + private static class CallbackObjectWrapper<T> implements InstanceWrapper<T> { + + private T instance; + + private CallbackObjectWrapper(T instance) { + this.instance = instance; + } + + public T getInstance() { + return instance; + } + + public void start() { + // do nothing + } + + public void stop() { + // do nothing + } + + } + +} diff --git a/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java new file mode 100644 index 0000000000..853e28fafa --- /dev/null +++ b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKProxyFactory.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.List; +import java.util.concurrent.Future; + +import javax.xml.ws.AsyncHandler; +import javax.xml.ws.Response; + +import org.apache.tuscany.sca.common.java.collection.LRUCache; +import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.core.context.impl.CallbackServiceReferenceImpl; +import org.apache.tuscany.sca.core.context.impl.ServiceReferenceImpl; +import org.apache.tuscany.sca.core.invocation.ProxyCreationException; +import org.apache.tuscany.sca.core.invocation.ProxyFactory; +import org.apache.tuscany.sca.interfacedef.InterfaceContractMapper; +import org.apache.tuscany.sca.invocation.MessageFactory; +import org.apache.tuscany.sca.runtime.RuntimeWire; +import org.oasisopen.sca.ServiceReference; + + +/** + * the default implementation of a wire service that uses JDK dynamic proxies + * + * @version $Rev$ $Date$ + */ +public class JDKProxyFactory implements ProxyFactory, LifeCycleListener { + protected InterfaceContractMapper contractMapper; + private MessageFactory messageFactory; + + public JDKProxyFactory(MessageFactory messageFactory, InterfaceContractMapper mapper) { + this.contractMapper = mapper; + this.messageFactory = messageFactory; + } + + /** + * The original createProxy method assumes that the proxy doesn't want to + * share conversation state so sets the conversation object to null + */ + public <T> T createProxy(Class<T> interfaze, RuntimeWire wire) throws ProxyCreationException { + ServiceReference<T> serviceReference = new ServiceReferenceImpl(interfaze, wire, this); + return createProxy(serviceReference); + } + + public <T> T createProxy(ServiceReference<T> callableReference) throws ProxyCreationException { + assert callableReference != null; + final Class<T> interfaze = callableReference.getBusinessInterface(); + InvocationHandler handler; + if (isAsync(interfaze)) { + handler = new AsyncJDKInvocationHandler(messageFactory, callableReference); + } else { + handler = new JDKInvocationHandler(messageFactory, callableReference); + } + // Allow privileged access to class loader. Requires RuntimePermission in security policy. + ClassLoader cl = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { + public ClassLoader run() { + return interfaze.getClassLoader(); + } + }); + Object proxy = newProxyInstance(cl, new Class[] {interfaze}, handler); + ((ServiceReferenceImpl)callableReference).setProxy(proxy); + return interfaze.cast(proxy); + } + + private boolean isAsync(Class<?> interfaze) { + for (Method method : interfaze.getMethods()) { + if (method.getName().endsWith("Async")) { + if (method.getReturnType().isAssignableFrom(Future.class)) { + if (method.getParameterTypes().length > 0) { + if (method.getParameterTypes()[method.getParameterTypes().length-1].isAssignableFrom(AsyncHandler.class)) { + return true; + } + } + } + if (method.getReturnType().isAssignableFrom(Response.class)) { + return true; + } + } + } + return false; + } + + public <T> T createCallbackProxy(Class<T> interfaze, List<RuntimeWire> wires) throws ProxyCreationException { + ServiceReferenceImpl<T> callbackReference = new CallbackServiceReferenceImpl(interfaze, wires, this); + return callbackReference != null ? createCallbackProxy(callbackReference) : null; + } + + public <T> T createCallbackProxy(ServiceReferenceImpl<T> callbackReference) throws ProxyCreationException { + assert callbackReference != null; + Class<T> interfaze = callbackReference.getBusinessInterface(); + InvocationHandler handler = new JDKCallbackInvocationHandler(messageFactory, callbackReference); + ClassLoader cl = interfaze.getClassLoader(); + Object proxy = newProxyInstance(cl, new Class[] {interfaze}, handler); + callbackReference.setProxy(proxy); + return interfaze.cast(proxy); + } + + public <B, R extends ServiceReference<B>> R cast(B target) throws IllegalArgumentException { + InvocationHandler handler = Proxy.getInvocationHandler(target); + if (handler instanceof JDKInvocationHandler) { + return (R)((JDKInvocationHandler)handler).getCallableReference(); + } else { + throw new IllegalArgumentException("The object is not a known proxy."); + } + } + + /** + * @see org.apache.tuscany.sca.core.invocation.ProxyFactory#isProxyClass(java.lang.Class) + */ + public boolean isProxyClass(Class<?> clazz) { + return Proxy.isProxyClass(clazz); + } + + // This is a cache containing the proxy class constructor for each business interface. + // This improves performance compared to calling Proxy.newProxyInstance() + // every time that a proxy is needed. + private final LRUCache<Class<?>, Constructor<?>> cache = new LRUCache<Class<?>, Constructor<?>>(512); + + public Object newProxyInstance(ClassLoader classloader, + Class<?> interfaces[], + InvocationHandler invocationhandler) throws IllegalArgumentException { + if (interfaces.length > 1) { + // We only cache the proxy constructors with one single interface which the case in SCA where + // one reference can have one interface + return Proxy.newProxyInstance(classloader, interfaces, invocationhandler); + } + try { + if (invocationhandler == null) + throw new NullPointerException("InvocationHandler is null"); + // Lookup cached constructor. aclass[0] is the reference's business interface. + Constructor<?> proxyCTOR; + synchronized (cache) { + proxyCTOR = cache.get(interfaces[0]); + } + if (proxyCTOR == null) { + Class<?> proxyClass = Proxy.getProxyClass(classloader, interfaces); + proxyCTOR = proxyClass.getConstructor(InvocationHandler.class); + synchronized (cache) { + cache.put(interfaces[0], proxyCTOR); + } + } + return proxyCTOR.newInstance(invocationhandler); + } catch (Throwable e) { + throw new IllegalArgumentException(e); + } + } + + public void start() { + } + + public void stop() { + cache.clear(); + } +} diff --git a/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageFactoryImpl.java b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageFactoryImpl.java new file mode 100644 index 0000000000..6ce2ffca21 --- /dev/null +++ b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageFactoryImpl.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import org.apache.tuscany.sca.invocation.Message; +import org.apache.tuscany.sca.invocation.MessageFactory; + +/** + * Implementation of MessageFactory. + * + * @version $Rev$ $Date$ + */ +public class MessageFactoryImpl implements MessageFactory { + + public Message createMessage() { + return new MessageImpl(); + } + +} diff --git a/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageImpl.java b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageImpl.java new file mode 100644 index 0000000000..53fab3392c --- /dev/null +++ b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/MessageImpl.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.tuscany.sca.assembly.Endpoint; +import org.apache.tuscany.sca.assembly.EndpointReference; +import org.apache.tuscany.sca.interfacedef.Operation; +import org.apache.tuscany.sca.invocation.Message; + +/** + * The default implementation of a message flowed through a wire during an invocation + * + * @version $Rev $Date$ + */ +public class MessageImpl implements Message { + private List<Object> headers = new ArrayList<Object>(); + private Object body; + private Object messageID; + private boolean isFault; + private Operation operation; + + private EndpointReference from; + private Endpoint to; + + private Object bindingContext; + + public MessageImpl() { + // TODO - EPR - What to do by default? + //this.from = new EndpointReferenceImpl("/"); + //this.to = new EndpointReferenceImpl("/"); + this.from = null; + this.to = null; + } + + @SuppressWarnings("unchecked") + public <T> T getBody() { + return (T)body; + } + + public <T> void setBody(T body) { + this.isFault = false; + this.body = body; + } + + public Object getMessageID() { + return messageID; + } + + public void setMessageID(Object messageId) { + this.messageID = messageId; + } + + public boolean isFault() { + return isFault; + } + + public void setFaultBody(Object fault) { + this.isFault = true; + this.body = fault; + } + + public EndpointReference getFrom() { + return from; + } + + public void setFrom(EndpointReference from) { + this.from = from; + } + + public Endpoint getTo() { + return to; + } + + public void setTo(Endpoint to) { + this.to = to; + } + + public Operation getOperation() { + return operation; + } + + public void setOperation(Operation op) { + this.operation = op; + } + + public List<Object> getHeaders() { + return headers; + } + + @SuppressWarnings("unchecked") + public <T> T getBindingContext() { + return (T)bindingContext; + } + + public <T> void setBindingContext(T bindingContext) { + this.bindingContext = bindingContext; + } +} diff --git a/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/NoMethodForOperationException.java b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/NoMethodForOperationException.java new file mode 100644 index 0000000000..45f4bf52bf --- /dev/null +++ b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/NoMethodForOperationException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import org.apache.tuscany.sca.core.invocation.ProxyCreationException; + + +/** + * Thrown when an {@link org.apache.tuscany.sca.core.factory.model.Operation} cannot be mapped to a method on an interface + * @version $Rev$ $Date$ + */ +public class NoMethodForOperationException extends ProxyCreationException { + private static final long serialVersionUID = 5116536602309483679L; + + public NoMethodForOperationException() { + } + + public NoMethodForOperationException(String message) { + super(message); + } + + public NoMethodForOperationException(String message, Throwable cause) { + super(message, cause); + } + + public NoMethodForOperationException(Throwable cause) { + super(cause); + } +} diff --git a/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseManager.java b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseManager.java new file mode 100644 index 0000000000..1649eade87 --- /dev/null +++ b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseManager.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.invocation.impl; + +import static org.apache.tuscany.sca.invocation.Phase.IMPLEMENTATION; +import static org.apache.tuscany.sca.invocation.Phase.IMPLEMENTATION_POLICY; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_BINDING; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_BINDING_POLICY; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_BINDING_TRANSPORT; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_BINDING_WIREFORMAT; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_INTERFACE; +import static org.apache.tuscany.sca.invocation.Phase.REFERENCE_POLICY; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING_OPERATION_SELECTOR; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING_POLICY; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING_TRANSPORT; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_BINDING_WIREFORMAT; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_INTERFACE; +import static org.apache.tuscany.sca.invocation.Phase.SERVICE_POLICY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry; +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.UtilityExtensionPoint; +import org.apache.tuscany.sca.extensibility.ServiceDeclaration; +import org.apache.tuscany.sca.invocation.Phase; +import org.oasisopen.sca.ServiceRuntimeException; + +/** + * @version $Rev$ $Date$ + */ +public class PhaseManager { + private static final Logger log = Logger.getLogger(PhaseManager.class.getName()); + + public static final String STAGE_REFERENCE = "reference"; + public static final String STAGE_REFERENCE_BINDING = "reference.binding"; + public static final String STAGE_SERVICE_BINDING = "service.binding"; + public static final String STAGE_SERVICE = "service"; + public static final String STAGE_IMPLEMENTATION = "implementation"; + + private static final String[] SYSTEM_REFERENCE_PHASES = + {REFERENCE, REFERENCE_INTERFACE, REFERENCE_POLICY, REFERENCE_BINDING}; + + private static final String[] SYSTEM_REFERENCE_BINDING_PHASES = + {REFERENCE_BINDING_WIREFORMAT, REFERENCE_BINDING_POLICY, REFERENCE_BINDING_TRANSPORT}; + + private static final String[] SYSTEM_SERVICE_BINDING_PHASES = + {SERVICE_BINDING_TRANSPORT, SERVICE_BINDING_OPERATION_SELECTOR, SERVICE_BINDING_WIREFORMAT, SERVICE_BINDING_POLICY}; + + private static final String[] SYSTEM_SERVICE_PHASES = + {SERVICE_BINDING, SERVICE_POLICY, SERVICE_INTERFACE, SERVICE}; + + private static final String[] SYSTEM_IMPLEMENTATION_PHASES = {IMPLEMENTATION_POLICY, IMPLEMENTATION}; + + private ExtensionPointRegistry registry; + private String pattern = Phase.class.getName(); + private Map<String, Stage> stages; + private List<String> phases; + + public class Stage { + private String name; + private PhaseSorter<String> sorter = new PhaseSorter<String>(); + private Set<String> firstSet = new HashSet<String>(); + private Set<String> lastSet = new HashSet<String>(); + private List<String> phases = new ArrayList<String>(); + + public Stage(String name) { + super(); + this.name = name; + } + + public String getName() { + return name; + } + + public PhaseSorter<String> getSorter() { + return sorter; + } + + public Set<String> getFirstSet() { + return firstSet; + } + + public Set<String> getLastSet() { + return lastSet; + } + + public List<String> getPhases() { + return phases; + } + + @Override + public String toString() { + return name + phases; + } + } + + /** + * @param registry + */ + public PhaseManager(ExtensionPointRegistry registry) { + super(); + this.registry = registry; + } + + public static PhaseManager getInstance(ExtensionPointRegistry registry) { + UtilityExtensionPoint utilityExtensionPoint = registry.getExtensionPoint(UtilityExtensionPoint.class); + return utilityExtensionPoint.getUtility(PhaseManager.class); + } + + // For unit test purpose + PhaseManager(String pattern) { + super(); + this.pattern = pattern; + this.registry = new DefaultExtensionPointRegistry(); + } + + private List<String> getPhases(String stage) { + Stage s = getStages().get(stage); + return s == null ? null : s.getPhases(); + } + + public List<String> getReferencePhases() { + return getPhases(STAGE_REFERENCE); + } + + public List<String> getServicePhases() { + return getPhases(STAGE_SERVICE); + } + + public List<String> getReferenceBindingPhases() { + return getPhases(STAGE_REFERENCE_BINDING); + } + + public List<String> getServiceBindingPhases() { + return getPhases(STAGE_SERVICE_BINDING); + } + + public List<String> getImplementationPhases() { + return getPhases(STAGE_IMPLEMENTATION); + } + + public synchronized List<String> getAllPhases() { + if (phases == null) { + phases = new ArrayList<String>(); + phases.addAll(getReferencePhases()); + phases.addAll(getReferenceBindingPhases()); + phases.addAll(getServiceBindingPhases()); + phases.addAll(getServicePhases()); + phases.addAll(getImplementationPhases()); + } + return phases; + } + + public synchronized Map<String, Stage> getStages() { + if (stages != null) { + return stages; + } + init(); + + Collection<ServiceDeclaration> services; + try { + services = registry.getServiceDiscovery().getServiceDeclarations(pattern); + } catch (IOException e) { + throw new ServiceRuntimeException(e); + } + + for (ServiceDeclaration d : services) { + if (log.isLoggable(Level.FINE)) { + log.fine(d.getLocation() + ": " + d.getAttributes()); + } + String name = d.getAttributes().get("name"); + if (name == null) { + throw new ServiceRuntimeException("Required attribute 'name' is missing."); + } + String stageName = d.getAttributes().get("stage"); + if (stageName == null) { + throw new ServiceRuntimeException("Required attribute 'stage' is missing."); + } + Stage stage = stages.get(stageName); + if (stage == null) { + throw new ServiceRuntimeException("Invalid stage: " + stageName); + } + PhaseSorter<String> graph = stage.getSorter(); + Set<String> firstSet = stage.getFirstSet(), lastSet = stage.getLastSet(); + + String before = d.getAttributes().get("before"); + String after = d.getAttributes().get("after"); + if (before != null) { + StringTokenizer tokenizer = new StringTokenizer(before); + while (tokenizer.hasMoreTokens()) { + String p = tokenizer.nextToken(); + if (!"*".equals(p)) { + graph.addEdge(name, p); + } else { + firstSet.add(name); + } + } + } + if (after != null) { + StringTokenizer tokenizer = new StringTokenizer(after); + while (tokenizer.hasMoreTokens()) { + String p = tokenizer.nextToken(); + if (!"*".equals(p)) { + graph.addEdge(p, name); + } else { + lastSet.add(name); + } + } + } + graph.addVertext(name); + if(firstSet.size()>1) { + log.warning("More than one phases are declared to be first: "+firstSet); + } + for (String s : firstSet) { + for (String v : new HashSet<String>(graph.getVertices().keySet())) { + if (!firstSet.contains(v)) { + graph.addEdge(s, v); + } + } + } + if(lastSet.size()>1) { + log.warning("More than one phases are declared to be the last: "+lastSet); + } + for (String s : lastSet) { + for (String v : new HashSet<String>(graph.getVertices().keySet())) { + if (!lastSet.contains(v)) { + graph.addEdge(v, s); + } + } + } + + } + + for (Stage s : stages.values()) { + List<String> phases = s.getSorter().topologicalSort(false); + s.getPhases().clear(); + s.getPhases().addAll(phases); + } + if (log.isLoggable(Level.FINE)) { + log.fine("Stages: " + stages); + } + return stages; + } + + private void init() { + stages = new HashMap<String, Stage>(); + + Stage referenceStage = new Stage(STAGE_REFERENCE); + for (int i = 1; i < SYSTEM_REFERENCE_PHASES.length; i++) { + referenceStage.getSorter().addEdge(SYSTEM_REFERENCE_PHASES[i - 1], SYSTEM_REFERENCE_PHASES[i]); + } + referenceStage.getLastSet().add(REFERENCE_BINDING); + stages.put(referenceStage.getName(), referenceStage); + + Stage referenceBindingStage = new Stage(STAGE_REFERENCE_BINDING); + for (int i = 1; i < SYSTEM_REFERENCE_BINDING_PHASES.length; i++) { + referenceBindingStage.getSorter().addEdge(SYSTEM_REFERENCE_BINDING_PHASES[i - 1], SYSTEM_REFERENCE_BINDING_PHASES[i]); + } + stages.put(referenceBindingStage.getName(), referenceBindingStage); + + Stage serviceBindingStage = new Stage(STAGE_SERVICE_BINDING); + for (int i = 1; i < SYSTEM_SERVICE_BINDING_PHASES.length; i++) { + serviceBindingStage.getSorter().addEdge(SYSTEM_SERVICE_BINDING_PHASES[i - 1], SYSTEM_SERVICE_BINDING_PHASES[i]); + } + stages.put(serviceBindingStage.getName(), serviceBindingStage); + + + Stage serviceStage = new Stage(STAGE_SERVICE); + for (int i = 1; i < SYSTEM_SERVICE_PHASES.length; i++) { + serviceStage.getSorter().addEdge(SYSTEM_SERVICE_PHASES[i - 1], SYSTEM_SERVICE_PHASES[i]); + } + stages.put(serviceStage.getName(), serviceStage); + + Stage implementationStage = new Stage(STAGE_IMPLEMENTATION); + for (int i = 1; i < SYSTEM_IMPLEMENTATION_PHASES.length; i++) { + implementationStage.getSorter().addEdge(SYSTEM_IMPLEMENTATION_PHASES[i - 1], + SYSTEM_IMPLEMENTATION_PHASES[i]); + } + implementationStage.getLastSet().add(IMPLEMENTATION); + stages.put(implementationStage.getName(), implementationStage); + } +} diff --git a/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseSorter.java b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseSorter.java new file mode 100644 index 0000000000..175f3463ad --- /dev/null +++ b/tags/java/sca/2.0-M4-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/PhaseSorter.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.invocation.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Directed, weighted graph + * + * @param <V> The type of vertex object + * @param <E> The type of edge object + * + * @version $Rev$ $Date$ + */ +public class PhaseSorter<V> implements Cloneable { + private final Map<V, Vertex> vertices = new HashMap<V, Vertex>(); + + /** + * Vertex of a graph + */ + public final class Vertex { + private V value; + + // TODO: Do we want to support multiple edges for a vertex pair? If so, + // we should use a List instead of Map + private Map<Vertex, Edge> outEdges = new HashMap<Vertex, Edge>(); + private Map<Vertex, Edge> inEdges = new HashMap<Vertex, Edge>(); + + private Vertex(V value) { + this.value = value; + } + + @Override + public String toString() { + return "(" + value + ")"; + } + + public V getValue() { + return value; + } + + public Map<Vertex, Edge> getOutEdges() { + return outEdges; + } + + public Map<Vertex, Edge> getInEdges() { + return inEdges; + } + + } + + /** + * An Edge connects two vertices in one direction + */ + public final class Edge { + private Vertex sourceVertex; + + private Vertex targetVertex; + + public Edge(Vertex source, Vertex target) { + this.sourceVertex = source; + this.targetVertex = target; + } + + @Override + public String toString() { + return sourceVertex + "->" + targetVertex; + } + + public Vertex getTargetVertex() { + return targetVertex; + } + + public void setTargetVertex(Vertex vertex) { + this.targetVertex = vertex; + } + + public Vertex getSourceVertex() { + return sourceVertex; + } + + public void setSourceVertex(Vertex sourceVertex) { + this.sourceVertex = sourceVertex; + } + } + + public void addEdge(V source, V target) { + Vertex s = getVertex(source); + if (s == null) { + s = new Vertex(source); + vertices.put(source, s); + } + Vertex t = getVertex(target); + if (t == null) { + t = new Vertex(target); + vertices.put(target, t); + } + Edge edge = new Edge(s, t); + s.outEdges.put(t, edge); + t.inEdges.put(s, edge); + } + + public void addVertext(V source) { + Vertex s = getVertex(source); + if (s == null) { + s = new Vertex(source); + vertices.put(source, s); + } + } + + public Vertex getVertex(V source) { + Vertex s = vertices.get(source); + return s; + } + + public boolean removeEdge(V source, V target) { + Vertex s = getVertex(source); + if (s == null) { + return false; + } + + Vertex t = getVertex(target); + if (t == null) { + return false; + } + + return s.outEdges.remove(t) != null && t.inEdges.remove(s) != null; + + } + + public void removeEdge(Edge edge) { + edge.sourceVertex.outEdges.remove(edge.targetVertex); + edge.targetVertex.inEdges.remove(edge.sourceVertex); + } + + public void removeVertex(Vertex vertex) { + vertices.remove(vertex.getValue()); + for (Edge e : new ArrayList<Edge>(vertex.outEdges.values())) { + removeEdge(e); + } + for (Edge e : new ArrayList<Edge>(vertex.inEdges.values())) { + removeEdge(e); + } + } + + public Edge getEdge(Vertex source, Vertex target) { + return source.outEdges.get(target); + } + + public Edge getEdge(V source, V target) { + Vertex sv = getVertex(source); + if (sv == null) { + return null; + } + Vertex tv = getVertex(target); + if (tv == null) { + return null; + } + return getEdge(getVertex(source), getVertex(target)); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + for (Vertex v : vertices.values()) { + sb.append(v.outEdges.values()).append("\n"); + } + return sb.toString(); + } + + public Map<V, Vertex> getVertices() { + return vertices; + } + + public void addGraph(PhaseSorter<V> otherGraph) { + for (Vertex v : otherGraph.vertices.values()) { + for (Edge e : v.outEdges.values()) { + addEdge(e.sourceVertex.value, e.targetVertex.value); + } + } + } + + private Vertex getFirst() { + for (Vertex v : vertices.values()) { + if (v.inEdges.isEmpty()) { + return v; + } + } + if (!vertices.isEmpty()) { + throw new IllegalArgumentException("Circular ordering has been detected: " + toString()); + } else { + return null; + } + } + + public List<V> topologicalSort(boolean readOnly) { + PhaseSorter<V> graph = (!readOnly) ? this : (PhaseSorter<V>)clone(); + List<V> list = new ArrayList<V>(); + while (true) { + Vertex v = graph.getFirst(); + if (v == null) { + break; + } + list.add(v.getValue()); + graph.removeVertex(v); + } + + return list; + } + + @Override + public Object clone() { + PhaseSorter<V> copy = new PhaseSorter<V>(); + copy.addGraph(this); + return copy; + } +} |