diff options
Diffstat (limited to 'branches/sca-equinox/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/impl/NodeImpl.java')
-rw-r--r-- | branches/sca-equinox/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/impl/NodeImpl.java | 322 |
1 files changed, 131 insertions, 191 deletions
diff --git a/branches/sca-equinox/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/impl/NodeImpl.java b/branches/sca-equinox/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/impl/NodeImpl.java index fef0a8391e..e53f478864 100644 --- a/branches/sca-equinox/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/impl/NodeImpl.java +++ b/branches/sca-equinox/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/impl/NodeImpl.java @@ -19,32 +19,27 @@ package org.apache.tuscany.sca.node.impl; -import static org.apache.tuscany.sca.node.impl.NodeUtil.collectJARs; -import static org.apache.tuscany.sca.node.impl.NodeUtil.createContribution; +import static java.lang.System.currentTimeMillis; +import static org.apache.tuscany.sca.node.impl.NodeUtil.contribution; import static org.apache.tuscany.sca.node.impl.NodeUtil.createURI; -import static org.apache.tuscany.sca.node.impl.NodeUtil.getContributionURL; -import static org.apache.tuscany.sca.node.impl.NodeUtil.getResource; +import static org.apache.tuscany.sca.node.impl.NodeUtil.loadModules; +import static org.apache.tuscany.sca.node.impl.NodeUtil.startModules; +import static org.apache.tuscany.sca.node.impl.NodeUtil.stopModules; import java.io.ByteArrayInputStream; import java.io.File; import java.io.InputStream; import java.net.URI; import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import javax.xml.namespace.QName; -import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.stream.XMLInputFactory; -import javax.xml.stream.XMLOutputFactory; import javax.xml.stream.XMLStreamReader; -import javax.xml.transform.TransformerFactory; import org.apache.tuscany.sca.assembly.AssemblyFactory; import org.apache.tuscany.sca.assembly.Component; @@ -53,11 +48,9 @@ import org.apache.tuscany.sca.assembly.Composite; import org.apache.tuscany.sca.assembly.CompositeService; import org.apache.tuscany.sca.assembly.builder.CompositeBuilder; import org.apache.tuscany.sca.assembly.builder.CompositeBuilderExtensionPoint; -import org.apache.tuscany.sca.assembly.xml.CompositeDocumentProcessor; import org.apache.tuscany.sca.contribution.Artifact; import org.apache.tuscany.sca.contribution.Contribution; import org.apache.tuscany.sca.contribution.ContributionFactory; -import org.apache.tuscany.sca.contribution.processor.ExtensibleStAXArtifactProcessor; import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor; import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint; import org.apache.tuscany.sca.contribution.processor.URLArtifactProcessor; @@ -73,6 +66,11 @@ import org.apache.tuscany.sca.core.ModuleActivatorExtensionPoint; import org.apache.tuscany.sca.core.UtilityExtensionPoint; import org.apache.tuscany.sca.core.assembly.ActivationException; import org.apache.tuscany.sca.core.assembly.CompositeActivator; +import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory; +import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory; +import org.apache.tuscany.sca.core.invocation.ProxyFactory; +import org.apache.tuscany.sca.core.invocation.ProxyFactoryExtensionPoint; +import org.apache.tuscany.sca.definitions.SCADefinitions; import org.apache.tuscany.sca.implementation.node.ConfiguredNodeImplementation; import org.apache.tuscany.sca.implementation.node.NodeImplementationFactory; import org.apache.tuscany.sca.monitor.Monitor; @@ -84,6 +82,7 @@ import org.apache.tuscany.sca.node.SCAContribution; import org.apache.tuscany.sca.node.SCANode; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.runtime.RuntimeComponentContext; +import org.apache.tuscany.sca.work.WorkScheduler; import org.apache.tuscany.sca.workspace.Workspace; import org.apache.tuscany.sca.workspace.WorkspaceFactory; import org.apache.tuscany.sca.workspace.builder.ContributionBuilder; @@ -107,6 +106,7 @@ public class NodeImpl implements SCANode, SCAClient { // The composite loaded into this node private Composite composite; + private ExtensionPointRegistry extensionPoints; private Monitor monitor; private URLArtifactProcessor<Contribution> contributionProcessor; private ModelResolverExtensionPoint modelResolvers; @@ -115,16 +115,14 @@ public class NodeImpl implements SCANode, SCAClient { private ContributionFactory contributionFactory; private AssemblyFactory assemblyFactory; private XMLInputFactory inputFactory; - private XMLOutputFactory outputFactory; - private DocumentBuilderFactory documentBuilderFactory; - private TransformerFactory transformerFactory; - private StAXArtifactProcessor<Object> xmlProcessor; private ContributionBuilder contributionDependencyBuilder; - private CompositeBuilder domainCompositeBuilder; + private CompositeBuilder compositeBuilder; private StAXArtifactProcessorExtensionPoint xmlProcessors; - private URLArtifactProcessorExtensionPoint documentProcessors; - private RuntimeBootStrapper runtime; + private StAXArtifactProcessor<Composite> compositeProcessor; + private ProxyFactory proxyFactory; + private List<ModuleActivator> modules; private CompositeActivator compositeActivator; + private WorkScheduler workScheduler; /** * Constructs a new SCA node. @@ -166,94 +164,6 @@ public class NodeImpl implements SCANode, SCAClient { } } - /** - * Construct a node by discovering the node configuration (composite+contrbutions) on the classpath - * @param classLoader - * @param compositeURI - */ - NodeImpl(ClassLoader classLoader, String compositeURI) { - configurationName = compositeURI; - logger.log(Level.INFO, "Creating node: " + configurationName); - - if (compositeURI != null) { - URI uri = URI.create(compositeURI); - if (uri.isAbsolute()) { - throw new IllegalArgumentException("Composite URI must be a resource name: " + compositeURI); - } - } - try { - // Initialize the runtime - init(); - - ConfiguredNodeImplementation config = findNodeConfiguration(compositeURI, classLoader); - configureNode(config); - } catch (Throwable e) { - throw new ServiceRuntimeException(e); - } - } - - /** - * Discover the contribution on the classpath - * @param compositeURI - * @param classLoader - * @return A configured node implementation - * @throws Exception - */ - private ConfiguredNodeImplementation findNodeConfiguration(final String compositeURI, ClassLoader classLoader) - throws Exception { - NodeImplementationFactory nodeImplementationFactory = - modelFactories.getFactory(NodeImplementationFactory.class); - ConfiguredNodeImplementation config = nodeImplementationFactory.createConfiguredNodeImplementation(); - - // Default to thread context classloader - if (classLoader == null) { - classLoader = Thread.currentThread().getContextClassLoader(); - } - String contributionArtifactPath = compositeURI; - URL contributionArtifactURL = null; - if (compositeURI != null) { - contributionArtifactURL = getResource(classLoader, compositeURI); - if (contributionArtifactURL == null) { - throw new IllegalArgumentException("Composite not found: " + contributionArtifactPath); - } - // Set to relative URI to avoid duplicate loading - Composite composite = createComposite(compositeURI); - config.setComposite(composite); - } else { - // No composite is specified, tring to search the SCA metadata files - contributionArtifactPath = Contribution.SCA_CONTRIBUTION_META; - contributionArtifactURL = getResource(classLoader, Contribution.SCA_CONTRIBUTION_META); - - if (contributionArtifactURL == null) { - contributionArtifactPath = Contribution.SCA_CONTRIBUTION_GENERATED_META; - contributionArtifactURL = getResource(classLoader, Contribution.SCA_CONTRIBUTION_GENERATED_META); - } - if (contributionArtifactURL == null) { - contributionArtifactPath = Contribution.SCA_CONTRIBUTION_DEPLOYABLES; - contributionArtifactURL = getResource(classLoader, Contribution.SCA_CONTRIBUTION_DEPLOYABLES); - } - - // No contribution can be discovered - if (contributionArtifactURL == null) { - throw new IllegalArgumentException("No default contribution can be discovered on the classpath"); - } - - // No composite will be created, all deployable composites will be used later - } - - Contribution c = getContribution(contributionArtifactURL, contributionArtifactPath); - config.getContributions().add(c); - - return config; - } - - private Contribution getContribution(URL contributionArtifactURL, String contributionArtifactPath) { - URL contributionURL = getContributionURL(contributionArtifactURL, contributionArtifactPath); - SCAContribution contribution = new SCAContribution(contributionURL.toString(), contributionURL.toString()); - Contribution c = createContribution(contributionFactory, contribution); - return c; - } - /** * Constructs a new SCA node. * @@ -268,28 +178,21 @@ public class NodeImpl implements SCANode, SCAClient { // Initialize the runtime init(); - URI uri = compositeURI == null ? null : URI.create(compositeURI); - ConfiguredNodeImplementation configuration = null; - if (contributions == null || contributions.length == 0) { - if (uri != null && uri.getScheme() != null) { - throw new IllegalArgumentException("No SCA contributions are provided"); - } - configuration = findNodeConfiguration(compositeURI, null); - } else { - - // Create a node configuration - NodeImplementationFactory nodeImplementationFactory = - modelFactories.getFactory(NodeImplementationFactory.class); - configuration = nodeImplementationFactory.createConfiguredNodeImplementation(); + // Create a node configuration + NodeImplementationFactory nodeImplementationFactory = modelFactories.getFactory(NodeImplementationFactory.class); + ConfiguredNodeImplementation configuration = nodeImplementationFactory.createConfiguredNodeImplementation(); - Composite composite = compositeURI == null ? null : createComposite(compositeURI); + if (compositeURI != null) { + Composite composite = assemblyFactory.createComposite(); + composite.setURI(compositeURI); + composite.setUnresolved(true); configuration.setComposite(composite); + } // Create contribution models - for (SCAContribution c : contributions) { - Contribution contribution = createContribution(contributionFactory, c); - configuration.getContributions().add(contribution); - } + for (SCAContribution c : contributions) { + Contribution contribution = contribution(contributionFactory, c); + configuration.getContributions().add(contribution); } // Configure the node @@ -300,14 +203,6 @@ public class NodeImpl implements SCANode, SCAClient { } } - private Composite createComposite(String compositeURI) { - // Create composite model - Composite composite = assemblyFactory.createComposite(); - composite.setURI(compositeURI); - composite.setUnresolved(true); - return composite; - } - /** * Constructs a new SCA node. * @@ -323,32 +218,29 @@ public class NodeImpl implements SCANode, SCAClient { // Initialize the runtime init(); - ConfiguredNodeImplementation configuration = null; - if (contributions == null || contributions.length == 0) { - configuration = findNodeConfiguration(compositeURI, null); - } else { - // Create a node configuration - NodeImplementationFactory nodeImplementationFactory = - modelFactories.getFactory(NodeImplementationFactory.class); - configuration = nodeImplementationFactory.createConfiguredNodeImplementation(); - - // Read the composite model - StAXArtifactProcessor<Composite> compositeProcessor = xmlProcessors.getProcessor(Composite.class); - // URL compositeURL = new URL(compositeURI); - logger.log(Level.INFO, "Loading composite: " + compositeURI); + // Create a node configuration + NodeImplementationFactory nodeImplementationFactory = modelFactories.getFactory(NodeImplementationFactory.class); + ConfiguredNodeImplementation configuration = nodeImplementationFactory.createConfiguredNodeImplementation(); - CompositeDocumentProcessor compositeDocProcessor = (CompositeDocumentProcessor)documentProcessors.getProcessor(Composite.class); - composite = compositeDocProcessor.read(URI.create(compositeURI), new ByteArrayInputStream(compositeContent.getBytes("UTF-8"))); + // Read the composite model + logger.log(Level.INFO, "Loading composite: " + compositeURI); - analyzeProblems(); + XMLStreamReader reader = inputFactory.createXMLStreamReader(new ByteArrayInputStream(compositeContent.getBytes("UTF-8"))); + reader.nextTag(); + + // Read the composite model + composite = (Composite)compositeProcessor.read(reader); + if (composite != null) { + composite.setURI(compositeURI); + } + analyzeProblems(); - configuration.setComposite(composite); + configuration.setComposite(composite); - // Create contribution models - for (SCAContribution c : contributions) { - Contribution contribution = createContribution(contributionFactory, c); - configuration.getContributions().add(contribution); - } + // Create contribution models + for (SCAContribution c : contributions) { + Contribution contribution = contribution(contributionFactory, c); + configuration.getContributions().add(contribution); } // Configure the node @@ -360,9 +252,10 @@ public class NodeImpl implements SCANode, SCAClient { } private void init() { + long start = currentTimeMillis(); // Create extension point registry - ExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry(); + extensionPoints = new DefaultExtensionPointRegistry(); // Create a monitor UtilityExtensionPoint utilities = extensionPoints.getExtensionPoint(UtilityExtensionPoint.class); @@ -378,17 +271,19 @@ public class NodeImpl implements SCANode, SCAClient { // Get XML input/output factories modelFactories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class); inputFactory = modelFactories.getFactory(XMLInputFactory.class); - outputFactory = modelFactories.getFactory(XMLOutputFactory.class); // Get contribution workspace and assembly model factories contributionFactory = modelFactories.getFactory(ContributionFactory.class); workspaceFactory = modelFactories.getFactory(WorkspaceFactory.class); assemblyFactory = modelFactories.getFactory(AssemblyFactory.class); + + // Use the runtime-enabled assembly factory + assemblyFactory = new RuntimeAssemblyFactory(); + modelFactories.addFactory(assemblyFactory); // Create XML artifact processors xmlProcessors = extensionPoints.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class); - documentProcessors = extensionPoints.getExtensionPoint(URLArtifactProcessorExtensionPoint.class); - xmlProcessor = new ExtensibleStAXArtifactProcessor(xmlProcessors, inputFactory, outputFactory, monitor); + compositeProcessor = xmlProcessors.getProcessor(Composite.class); // Create contribution content processor URLArtifactProcessorExtensionPoint docProcessorExtensions = extensionPoints.getExtensionPoint(URLArtifactProcessorExtensionPoint.class); @@ -403,25 +298,39 @@ public class NodeImpl implements SCANode, SCAClient { // Get composite builders CompositeBuilderExtensionPoint compositeBuilders = extensionPoints.getExtensionPoint(CompositeBuilderExtensionPoint.class); - domainCompositeBuilder = compositeBuilders.getCompositeBuilder("org.apache.tuscany.sca.assembly.builder.CompositeBuilder"); - } - - /** - * Initialize the Tuscany runtime. - * - * @throws Exception - */ - private void initRuntime() throws Exception { + compositeBuilder = compositeBuilders.getCompositeBuilder("org.apache.tuscany.sca.assembly.builder.CompositeBuilder"); + + // Initialize runtime - // Create a node runtime - runtime = new RuntimeBootStrapper(Thread.currentThread().getContextClassLoader()); - runtime.start(); + // Load the runtime modules + try { + modules = loadModules(extensionPoints); - // Save the composite activator - compositeActivator = runtime.getCompositeActivator(); + // Start the runtime modules + startModules(extensionPoints, modules); + + } catch (ActivationException e) { + throw new IllegalStateException(e); + } - } + // Get proxy factory + ProxyFactoryExtensionPoint proxyFactories = extensionPoints.getExtensionPoint(ProxyFactoryExtensionPoint.class); + proxyFactory = new ExtensibleProxyFactory(proxyFactories); + // Get the composite activator + compositeActivator = utilities.getUtility(CompositeActivator.class); + + workScheduler = utilities.getUtility(WorkScheduler.class); + + // Load the definitions.xml + //loadSCADefinitions(); + + if (logger.isLoggable(Level.FINE)) { + long end = currentTimeMillis(); + logger.fine("The tuscany runtime started in " + (end - start) + " ms."); + } + } + private void configureNode(ConfiguredNodeImplementation configuration) throws Exception { // Create workspace model @@ -460,20 +369,46 @@ public class NodeImpl implements SCANode, SCAClient { } composite = configuration.getComposite(); - if (composite.isUnresolved()) { - - // Find the composite in the given contributions - for (Contribution contribution: workspace.getContributions()) { - composite = contribution.getModelResolver().resolveModel(Composite.class, composite); + + // Find the composite in the given contributions + boolean found = false; + Artifact compositeFile = contributionFactory.createArtifact(); + compositeFile.setUnresolved(true); + compositeFile.setURI(composite.getURI()); + for (Contribution contribution: workspace.getContributions()) { + ModelResolver resolver = contribution.getModelResolver(); + Artifact resolvedArtifact = resolver.resolveModel(Artifact.class, compositeFile); + if (!resolvedArtifact.isUnresolved() && resolvedArtifact.getModel() instanceof Composite) { + if (!composite.isUnresolved()) { - break; + + // The composite content was passed into the node and read into a composite model, + // don't use the composite found in the contribution, use that composite, but just resolve + // it within the context of the contribution + compositeProcessor.resolve(composite, resolver); + + } else { + + // Use the resolved composite we've found in the contribution + composite = (Composite)resolvedArtifact.getModel(); } + found = true; + break; } } + if (!found) { + throw new IllegalArgumentException("Composite not found: " + composite.getURI()); + } + // Build an aggregated SCA definitions model + SCADefinitions definitions = null; + //definitions = new SCADefinitionsImpl(); + //for (SCADefinitions definition : ((List<SCADefinitions>)policyDefinitions)) { + // SCADefinitionsUtil.aggregateSCADefinitions(definition, aggregatedDefinitions); + //} + // Build the composite and wire the components included in it - domainCompositeBuilder.build(composite); - + compositeBuilder.build(composite, definitions, monitor); analyzeProblems(); // Create a top level composite to host our composite @@ -489,44 +424,49 @@ public class NodeImpl implements SCANode, SCAClient { // logic in callable reference resolution relies on this being // available compositeActivator.setDomainComposite(tempComposite); - } public void start() { logger.log(Level.INFO, "Starting node: " + configurationName); try { - + // Activate the composite compositeActivator.activate(composite); // Start the composite compositeActivator.start(composite); - + } catch (ActivationException e) { - throw new ServiceRuntimeException(e); + throw new IllegalStateException(e); } + } public void stop() { logger.log(Level.INFO, "Stopping node: " + configurationName); try { - + // Stop the composite compositeActivator.stop(composite); - + // Deactivate the composite compositeActivator.deactivate(composite); - runtime.stop(); } catch (ActivationException e) { - throw new ServiceRuntimeException(e); + throw new IllegalStateException(e); } + + // Stop the runtime modules + stopModules(extensionPoints, modules); + + // Stop and destroy the work manager + workScheduler.destroy(); } public <B, R extends CallableReference<B>> R cast(B target) throws IllegalArgumentException { - return (R)runtime.getProxyFactory().cast(target); + return (R)proxyFactory.cast(target); } public <B> B getService(Class<B> businessInterface, String serviceName) { |