diff options
Diffstat (limited to 'sandbox/rfeng/minicore/src/main/java/org/apache/tuscany/core/builder/ConnectorImpl.java')
-rw-r--r-- | sandbox/rfeng/minicore/src/main/java/org/apache/tuscany/core/builder/ConnectorImpl.java | 385 |
1 files changed, 385 insertions, 0 deletions
diff --git a/sandbox/rfeng/minicore/src/main/java/org/apache/tuscany/core/builder/ConnectorImpl.java b/sandbox/rfeng/minicore/src/main/java/org/apache/tuscany/core/builder/ConnectorImpl.java new file mode 100644 index 0000000000..6923fb6837 --- /dev/null +++ b/sandbox/rfeng/minicore/src/main/java/org/apache/tuscany/core/builder/ConnectorImpl.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.core.builder; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.xml.namespace.QName; + +import org.osoa.sca.annotations.Constructor; + +import org.apache.tuscany.spi.builder.BuilderException; +import org.apache.tuscany.spi.builder.Connector; +import org.apache.tuscany.spi.builder.WiringException; +import org.apache.tuscany.spi.builder.interceptor.InterceptorBuilderRegistry; +import org.apache.tuscany.spi.builder.physical.WireAttacherRegistry; +import org.apache.tuscany.spi.component.AtomicComponent; +import org.apache.tuscany.spi.component.Component; +import org.apache.tuscany.spi.component.ComponentManager; +import org.apache.tuscany.spi.component.Invocable; +import org.apache.tuscany.spi.component.Reference; +import org.apache.tuscany.spi.component.ReferenceBinding; +import org.apache.tuscany.spi.component.Service; +import org.apache.tuscany.spi.component.ServiceBinding; +import org.apache.tuscany.spi.component.TargetInvokerCreationException; +import org.apache.tuscany.spi.component.WorkContext; +import org.apache.tuscany.spi.model.ComponentDefinition; +import org.apache.tuscany.spi.model.ComponentType; +import org.apache.tuscany.spi.model.CompositeComponentType; +import org.apache.tuscany.spi.model.Implementation; +import org.apache.tuscany.spi.model.Operation; +import org.apache.tuscany.spi.model.ReferenceDefinition; +import org.apache.tuscany.spi.model.ReferenceTarget; +import org.apache.tuscany.spi.model.Scope; +import org.apache.tuscany.spi.model.ServiceContract; +import org.apache.tuscany.spi.model.ServiceDefinition; +import org.apache.tuscany.spi.model.physical.PhysicalInterceptorDefinition; +import org.apache.tuscany.spi.model.physical.PhysicalOperationDefinition; +import org.apache.tuscany.spi.model.physical.PhysicalWireDefinition; +import org.apache.tuscany.spi.model.physical.PhysicalWireSourceDefinition; +import org.apache.tuscany.spi.model.physical.PhysicalWireTargetDefinition; +import org.apache.tuscany.spi.services.work.WorkScheduler; +import org.apache.tuscany.spi.util.UriHelper; +import org.apache.tuscany.spi.wire.Interceptor; +import org.apache.tuscany.spi.wire.InvocationChain; +import org.apache.tuscany.spi.wire.Wire; +import org.apache.tuscany.spi.wire.WirePostProcessorRegistry; + +import org.apache.tuscany.core.wire.InvocationChainImpl; +import org.apache.tuscany.core.wire.InvokerInterceptor; +import org.apache.tuscany.core.wire.NonBlockingInterceptor; +import org.apache.tuscany.core.wire.WireImpl; +import org.apache.tuscany.core.wire.WireUtils; + +/** + * The default connector implmentation + * + * @version $$Rev$$ $$Date$$ + */ +public class ConnectorImpl implements Connector { + private WirePostProcessorRegistry postProcessorRegistry; + private ComponentManager componentManager; + private WorkContext workContext; + private WorkScheduler scheduler; + private InterceptorBuilderRegistry interceptorBuilderRegistry; + private WireAttacherRegistry attacherRegistry; + + public ConnectorImpl(ComponentManager componentManager) { + this.componentManager = componentManager; + } + + @Constructor + public ConnectorImpl( + @org.osoa.sca.annotations.Reference InterceptorBuilderRegistry interceptorBuilderRegistry, + @org.osoa.sca.annotations.Reference WireAttacherRegistry attacherRegistry, + @org.osoa.sca.annotations.Reference WirePostProcessorRegistry processorRegistry, + @org.osoa.sca.annotations.Reference ComponentManager componentManager, + @org.osoa.sca.annotations.Reference WorkScheduler scheduler, + @org.osoa.sca.annotations.Reference WorkContext workContext) { + this.attacherRegistry = attacherRegistry; + this.interceptorBuilderRegistry = interceptorBuilderRegistry; + this.postProcessorRegistry = processorRegistry; + this.componentManager = componentManager; + this.scheduler = scheduler; + this.workContext = workContext; + } + + /** + * <strong>Note this method will not work yet</strong> + * <p/> + * Wires a source and target component based on a wire defintion + * + * @param definition the wire definition + * @throws WiringException + */ + public void connect(PhysicalWireDefinition definition) throws BuilderException { + URI sourceUri = definition.getSourceUri(); + assert sourceUri != null; + URI targetUri = definition.getTargetUri(); + assert targetUri != null; + URI baseSourceUri = UriHelper.getDefragmentedName(sourceUri); + URI baseTargetUri = UriHelper.getDefragmentedName(targetUri); + Component source = componentManager.getComponent(baseSourceUri); + if (source == null) { + throw new ComponentNotFoundException("Wire source component not found", baseSourceUri); + } + Wire wire = createWire(definition); + + PhysicalWireSourceDefinition sourceDefinition = definition.getSource(); + PhysicalWireTargetDefinition targetDefinition = definition.getTarget(); + Component target; + if (baseTargetUri != null) { + target = componentManager.getComponent(baseTargetUri); + if (target == null) { + throw new ComponentNotFoundException("Wire target component not found", baseTargetUri); + } + } else { + target = null; + } + attacherRegistry.attachToSource(source, sourceDefinition, target, targetDefinition, wire); + attacherRegistry.attachToTarget(source, sourceDefinition, target, targetDefinition, wire); + } + + public void connect(ComponentDefinition<? extends Implementation<?>> definition) throws WiringException { + URI sourceUri = definition.getUri(); + Component source = componentManager.getComponent(sourceUri); + if (source == null) { + throw new ComponentNotFoundException("Source not found", sourceUri); + } + ComponentType<?, ?, ?> type = definition.getImplementation().getComponentType(); + if (type instanceof CompositeComponentType) { + CompositeComponentType<?, ?, ?> compositeType = (CompositeComponentType<?, ?, ?>) type; + for (ComponentDefinition<? extends Implementation<?>> child : compositeType.getComponents().values()) { + connect(child); + } + for (ServiceDefinition child : compositeType.getServices().values()) { + connect(child); + } + for (ReferenceDefinition child : compositeType.getReferences().values()) { + connect(child); + } + } + Map<String, ReferenceTarget> targets = definition.getReferenceTargets(); + for (ReferenceTarget referenceTarget : targets.values()) { + List<Wire> wires = new ArrayList<Wire>(); + String refName = referenceTarget.getReferenceName().getFragment(); + ReferenceDefinition refDefinition = type.getReferences().get(refName); + assert refDefinition != null; + List<URI> uris = referenceTarget.getTargets(); + for (URI uri : uris) { + URI targetUri = UriHelper.getDefragmentedName(uri); + Component target = componentManager.getComponent(targetUri); + if (target == null && !refDefinition.isRequired()) { + // a non-required reference, just skip + continue; + } + if (target == null) { + throw new ComponentNotFoundException("Target not found", targetUri); + } + String fragment = uri.getFragment(); + URI sourceURI = refDefinition.getUri(); + Wire wire = createWire(sourceURI, uri, refDefinition.getServiceContract(), Wire.LOCAL_BINDING); + try { + attachInvokers(fragment, wire, source, target); + } catch (TargetInvokerCreationException e) { + throw new WireCreationException("Error creating invoker", sourceUri, targetUri, e); + } + if (postProcessorRegistry != null) { + postProcessorRegistry.process(wire); + } + optimize(source, target, wire); + wires.add(wire); + if (!wire.getCallbackInvocationChains().isEmpty()) { + target.attachCallbackWire(wire); + } + } + if (wires.size() > 1) { + // attach as a multiplicity + source.attachWires(wires); + } else if (wires.size() == 1) { + // attach as a single wire + Wire wire = wires.get(0); + source.attachWire(wire); + } + } + } + + /** + * @deprecated + */ + protected void connect(ServiceDefinition definition) throws WiringException { + URI uri = definition.getUri(); + URI sourceUri = UriHelper.getDefragmentedName(uri); + URI targetUri = definition.getTarget(); + URI baseTargetUri = UriHelper.getDefragmentedName(targetUri); + Component source = componentManager.getComponent(sourceUri); + if (source == null) { + throw new ComponentNotFoundException("Source not found", sourceUri); + } + Service service = source.getService(uri.getFragment()); + if (service == null) { + throw new SourceServiceNotFoundException("Service not found on composite", uri); + } + Component target = componentManager.getComponent(baseTargetUri); + if (target == null) { + throw new ComponentNotFoundException("Target not found", sourceUri); + } + ServiceContract<?> contract = definition.getServiceContract(); + // TODO if no binding, do local + for (ServiceBinding binding : service.getServiceBindings()) { + Wire wire = createWire(uri, targetUri, contract, binding.getBindingType()); + binding.setWire(wire); + if (postProcessorRegistry != null) { + postProcessorRegistry.process(wire); + } + try { + attachInvokers(definition.getTarget().getFragment(), wire, binding, target); + } catch (TargetInvokerCreationException e) { + throw new WireCreationException("Error creating invoker", sourceUri, baseTargetUri, e); + } + } + } + + /** + * @deprecated + */ + protected void connect(ReferenceDefinition definition) throws WiringException { + URI uri = definition.getUri(); + URI sourceUri = UriHelper.getDefragmentedName(uri); + Component source = componentManager.getComponent(sourceUri); + if (source == null) { + throw new ComponentNotFoundException("Source not found", sourceUri); + } + Reference reference = source.getReference(uri.getFragment()); + if (reference == null) { + throw new SourceServiceNotFoundException("Reference not found on composite", uri); + } + + for (ReferenceBinding binding : reference.getReferenceBindings()) { + // create wire + if (Wire.LOCAL_BINDING.equals(binding.getBindingType())) { + URI targetUri = binding.getTargetUri(); + ServiceContract<?> contract = binding.getBindingServiceContract(); + QName type = binding.getBindingType(); + Wire wire = createWire(sourceUri, targetUri, contract, type); + binding.setWire(wire); + // wire local bindings to their targets + Component target = componentManager.getComponent(UriHelper.getDefragmentedName(targetUri)); + if (target == null) { + throw new ComponentNotFoundException("Target not found", sourceUri); + } + try { + attachInvokers(targetUri.getFragment(), wire, binding, target); + } catch (TargetInvokerCreationException e) { + throw new WireCreationException("Error creating invoker", sourceUri, targetUri, e); + } + } else { + Wire wire = createWire(sourceUri, null, binding.getBindingServiceContract(), binding.getBindingType()); + if (postProcessorRegistry != null) { + postProcessorRegistry.process(wire); + } + binding.setWire(wire); + } + } + } + + protected Wire createWire(PhysicalWireDefinition definition) throws BuilderException { + URI sourceURI = definition.getSourceUri(); + URI targetUri = definition.getTargetUri(); + Wire wire = new WireImpl(); + wire.setSourceUri(sourceURI); + wire.setTargetUri(targetUri); + for (PhysicalOperationDefinition operation : definition.getOperations()) { + InvocationChain chain = new InvocationChainImpl(operation); + for (PhysicalInterceptorDefinition interceptorDefinition : operation.getInterceptors()) { + Interceptor interceptor = interceptorBuilderRegistry.build(interceptorDefinition); + chain.addInterceptor(interceptor); + } + wire.addInvocationChain(operation, chain); + } + return wire; + } + + protected Wire createWire(URI sourceURI, URI targetUri, ServiceContract<?> contract, QName bindingType) { + Wire wire = new WireImpl(bindingType); + wire.setSourceContract(contract); + wire.setTargetContract(contract); + wire.setSourceUri(sourceURI); + wire.setTargetUri(targetUri); + for (Operation<?> operation : contract.getOperations().values()) { + InvocationChain chain = new InvocationChainImpl(operation); + if (operation.isNonBlocking()) { + chain.addInterceptor(new NonBlockingInterceptor(scheduler, workContext)); + } + chain.addInterceptor(new InvokerInterceptor()); + wire.addInvocationChain(operation, chain); + + } + for (Operation<?> operation : contract.getCallbackOperations().values()) { + InvocationChain chain = new InvocationChainImpl(operation); + if (operation.isNonBlocking()) { + chain.addInterceptor(new NonBlockingInterceptor(scheduler, workContext)); + } + chain.addInterceptor(new InvokerInterceptor()); + wire.addCallbackInvocationChain(operation, chain); + } + return wire; + } + + /** + * @Deprecated + */ + private void attachInvokers(String name, Wire wire, Invocable source, Invocable target) + throws TargetInvokerCreationException { + // TODO section will deleted be replaced when we cut-over to the physical marshallers + for (InvocationChain chain : wire.getInvocationChains().values()) { + chain.setTargetInvoker(target.createTargetInvoker(name, chain.getOperation())); + } + for (InvocationChain chain : wire.getCallbackInvocationChains().values()) { + chain.setTargetInvoker(source.createTargetInvoker(null, chain.getOperation())); + } + } + + /** + * @Deprecated + */ + protected void optimize(Component source, Component target, Wire wire) { + boolean optimizableScopes = isOptimizable(source.getScope(), target.getScope()); + if (optimizableScopes && target.isOptimizable() && WireUtils.isOptimizable(wire)) { + wire.setOptimizable(true); + wire.setTarget((AtomicComponent) target); + } else { + wire.setOptimizable(false); + } + } + + protected boolean isOptimizable(Scope pReferrer, Scope pReferee) { + if (pReferrer == Scope.UNDEFINED + || pReferee == Scope.UNDEFINED + || pReferrer == Scope.CONVERSATION + || pReferee == Scope.CONVERSATION) { + return false; + } + if (pReferee == pReferrer) { + return true; + } else if (pReferrer == Scope.STATELESS) { + return true; + } else if (pReferee == Scope.STATELESS) { + return false; + } else if (pReferrer == Scope.REQUEST && pReferee == Scope.SESSION) { + return true; + } else if (pReferrer == Scope.REQUEST && pReferee == Scope.COMPOSITE) { + return true; + } else if (pReferrer == Scope.REQUEST && pReferee == Scope.SYSTEM) { + return true; + } else if (pReferrer == Scope.SESSION && pReferee == Scope.COMPOSITE) { + return true; + } else if (pReferrer == Scope.SESSION && pReferee == Scope.SYSTEM) { + return true; + } else //noinspection SimplifiableIfStatement + if (pReferrer == Scope.SYSTEM && pReferee == Scope.COMPOSITE) { + // case where a service context points to a composite scoped component + return true; + } else { + return pReferrer == Scope.COMPOSITE && pReferee == Scope.SYSTEM; + } + } +}
\ No newline at end of file |