From 132aa8a77685ec92bc90c03f987650d275a7b639 Mon Sep 17 00:00:00 2001 From: lresende Date: Mon, 30 Sep 2013 06:59:11 +0000 Subject: 2.0.1 RC1 release tag git-svn-id: http://svn.us.apache.org/repos/asf/tuscany@1527464 13f79535-47bb-0310-9956-ffa450edef68 --- .../sca/core/work/impl/DefaultWorkScheduler.java | 210 +++++++++++++++++++ .../sca/core/work/impl/ThreadPoolWorkManager.java | 232 +++++++++++++++++++++ .../apache/tuscany/sca/core/work/impl/Work.java | 65 ++++++ .../tuscany/sca/core/work/impl/WorkEvent.java | 80 +++++++ .../tuscany/sca/core/work/impl/WorkItem.java | 167 +++++++++++++++ .../tuscany/sca/core/work/impl/WorkListener.java | 32 +++ 6 files changed, 786 insertions(+) create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/DefaultWorkScheduler.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManager.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/Work.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/WorkEvent.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/WorkItem.java create mode 100644 sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/WorkListener.java (limited to 'sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl') diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/DefaultWorkScheduler.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/DefaultWorkScheduler.java new file mode 100644 index 0000000000..dd4abf0cd2 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/DefaultWorkScheduler.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.work.impl; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import org.apache.tuscany.sca.core.ExtensionPointRegistry; +import org.apache.tuscany.sca.core.LifeCycleListener; +import org.apache.tuscany.sca.work.NotificationListener; +import org.apache.tuscany.sca.work.WorkScheduler; +import org.apache.tuscany.sca.work.WorkSchedulerException; + +/** + * A work scheduler implementation based on a JSR 237 work manager. + *

+ *

+ * This needs a JSR 237 work manager implementation available for scheduling work. Instances can be configured with a + * work manager implementation that is injected in. It is the responsibility of the runtime environment to make a work + * manager implementation available. For example, if the managed environment supports work manager the runtime can use + * the appropriate lookup mechanism to inject the work manager implementation.

+ * + * @version $Rev$ $Date$ + */ +public class DefaultWorkScheduler implements WorkScheduler, LifeCycleListener { + + /** + * Underlying JSR-237 work manager + */ + private ThreadPoolWorkManager jsr237WorkManager; + private int maxThreads = 0; + + /** + * Initializes the JSR 237 work manager. + * + * @param jsr237WorkManager JSR 237 work manager. + */ + public DefaultWorkScheduler(ExtensionPointRegistry registry, Map attributes) { + if (attributes != null) { + String value = attributes.get("maxThreads"); + if (value != null) { + maxThreads = Integer.parseInt(value.trim()); + } + } + } + + private synchronized ThreadPoolWorkManager getWorkManager() { + if (jsr237WorkManager != null) { + return jsr237WorkManager; + } +// try { +// InitialContext ctx = new InitialContext(); +// jsr237WorkManager = (ThreadPoolWorkManager)ctx.lookup("java:comp/env/wm/TuscanyWorkManager"); +// } catch (Throwable e) { +// // ignore +// } + if (jsr237WorkManager == null) { + jsr237WorkManager = new ThreadPoolWorkManager(maxThreads); + } + return jsr237WorkManager; + } + + /** + * Schedules a unit of work for future execution. The notification listener is used to register interest in + * callbacks regarding the status of the work. + * + * @param work The unit of work that needs to be asynchronously executed. + */ + public void scheduleWork(T work) { + scheduleWork(work, null); + } + + /** + * Schedules a unit of work for future execution. The notification listener is used to register interest in + * callbacks regarding the status of the work. + * + * @param work The unit of work that needs to be asynchronously executed. + * @param listener Notification listener for callbacks. + */ + public void scheduleWork(T work, NotificationListener listener) { + + if (work == null) { + throw new IllegalArgumentException("Work cannot be null"); + } + + Work jsr237Work = new Work(work); + try { + if (listener == null) { + getWorkManager().schedule(jsr237Work); + } else { + Jsr237WorkListener jsr237WorkListener = new Jsr237WorkListener(listener, work); + getWorkManager().schedule(jsr237Work, jsr237WorkListener); + } + } catch (IllegalArgumentException ex) { + if (listener != null) { + listener.workRejected(work); + } else { + throw new WorkSchedulerException(ex); + } + } catch (Exception ex) { + throw new WorkSchedulerException(ex); + } + + } + + public void start() { + } + + public void stop() { + if (jsr237WorkManager instanceof ThreadPoolWorkManager) { + // Allow privileged access to modify threads. Requires RuntimePermission in security + // policy. + AccessController.doPrivileged(new PrivilegedAction() { + public Object run() { + ((ThreadPoolWorkManager)jsr237WorkManager).destroy(); + return null; + } + }); + } + } + + /* + * WorkListener for keeping track of work status callbacks. + * + */ + private class Jsr237WorkListener implements WorkListener { + + // Notification listener + private NotificationListener listener; + + // Work + private T work; + + /* + * Initializes the notification listener. + */ + public Jsr237WorkListener(NotificationListener listener, T work) { + this.listener = listener; + this.work = work; + } + + /* + * Callback when the work is accepted. + */ + public void workAccepted(WorkEvent workEvent) { + T work = getWork(); + listener.workAccepted(work); + } + + /* + * Callback when the work is rejected. + */ + public void workRejected(WorkEvent workEvent) { + T work = getWork(); + listener.workRejected(work); + } + + /* + * Callback when the work is started. + */ + public void workStarted(WorkEvent workEvent) { + T work = getWork(); + listener.workStarted(work); + } + + /* + * Callback when the work is completed. + */ + public void workCompleted(WorkEvent workEvent) { + T work = getWork(); + Exception exception = workEvent.getException(); + if (exception != null) { + listener.workFailed(work, exception); + } else { + listener.workCompleted(work); + } + } + + /* + * Gets the underlying work from the work event. + */ + private T getWork() { + return work; + } + + } + + @Override + public ExecutorService getExecutorService() { + return getWorkManager().getExecutorService(); + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManager.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManager.java new file mode 100644 index 0000000000..7e116655e1 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManager.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.work.impl; + +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; + +import org.apache.tuscany.sca.work.WorkSchedulerException; + +/** + * A thread-pool based implementation for the JSR-237 work manager. + *

+ *

+ * This implementation supports only local work. + *

+ * TODO Elaborate the implementation.

+ * + * @version $Rev$ $Date$ + */ +public class ThreadPoolWorkManager { + + // Map of work items currently handled by the work manager + private Map workItems = new ConcurrentHashMap(); + + // Thread-pool + protected ExecutorService executor; + + /** + * Initializes the thread-pool. + * + * @param threadPoolSize Thread-pool size. If the size <1, then a cached pool is created + */ + public ThreadPoolWorkManager(int threadPoolSize) { + ThreadFactory factory = new ThreadFactory() { + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setDaemon(true); + return thread; + } + }; + if (threadPoolSize <= 0) { + + // Creates a new Executor, use a custom ThreadFactory that + // creates daemon threads. + executor = Executors.newCachedThreadPool(factory); + } else { + executor = Executors.newFixedThreadPool(threadPoolSize, factory); + } + } + + /** + * Schedules a unit of work asynchronously. + * + * @param work Work that needs to be scheduled. + * @return Work Work item representing the asynchronous work + */ + public WorkItem schedule(Work work) throws IllegalArgumentException { + return schedule(work, null); + } + + /** + * Schedules a unit of work asynchronously. + * + * @param work Work that needs to be scheduled. + * @param workListener Work listener for callbacks. + * @return Work Work item representing the asynchronous work + */ + public WorkItem schedule(Work work, WorkListener workListener) throws IllegalArgumentException { + + WorkItem workItem = new WorkItem(UUID.randomUUID().toString(), work); + if (workListener != null) { + workItems.put(workItem, workListener); + } + workAccepted(workItem, work); + if (scheduleWork(work, workItem)) { + return workItem; + } else { + workItem.setStatus(WorkEvent.WORK_REJECTED); + if (workListener != null) { + workListener.workRejected(new WorkEvent(workItem)); + } + throw new IllegalArgumentException("Unable to schedule work"); + } + } + + /** + * Wait for all the specified units of work to finish. + * + * @param works Units of the work that need to finish. + * @param timeout Timeout for waiting for the units of work to finish. + */ + public boolean waitForAll(Collection works, long timeout) { + throw new UnsupportedOperationException("waitForAll not supported"); + } + + /** + * Wait for any of the specified units of work to finish. + * + * @param works Units of the work that need to finish. + * @param timeout Timeout for waiting for the units of work to finish. + */ + public Collection waitForAny(Collection works, long timeout) { + throw new UnsupportedOperationException("waitForAny not supported"); + } + + /** + * Method provided for subclasses to indicate a work acceptance. + * + * @param workItem Work item representing the work that was accepted. + * @param work Work that was accepted. + */ + private void workAccepted(final WorkItem workItem, final Work work) { + WorkListener listener = workItems.get(workItem); + if (listener != null) { + workItem.setStatus(WorkEvent.WORK_ACCEPTED); + WorkEvent event = new WorkEvent(workItem); + listener.workAccepted(event); + } + } + + /* + * Method to indicate a work start. + */ + private void workStarted(final WorkItem workItem, final Work work) { + WorkListener listener = workItems.get(workItem); + if (listener != null) { + workItem.setStatus(WorkEvent.WORK_STARTED); + WorkEvent event = new WorkEvent(workItem); + listener.workStarted(event); + } + } + + /* + * Method to indicate a work completion. + */ + private void workCompleted(final WorkItem workItem, final Work work) { + workCompleted(workItem, work, null); + } + + /* + * Method to indicate a work completion. + */ + private void workCompleted(final WorkItem workItem, final Work work, final WorkSchedulerException exception) { + WorkListener listener = workItems.get(workItem); + if (listener != null) { + workItem.setStatus(WorkEvent.WORK_COMPLETED); + workItem.setResult(work); + workItem.setException(exception); + WorkEvent event = new WorkEvent(workItem); + listener.workCompleted(event); + workItems.remove(workItem); + } + } + + /* + * Schedules the work using the ThreadPool. + */ + private boolean scheduleWork(final Work work, final WorkItem workItem) { + try { + executor.execute(new DecoratingWork(workItem, work)); + return true; + } catch (RejectedExecutionException ex) { + return false; + } + } + + /* + * Class that decorates the original worker so that it can get callbacks when work is done. + */ + private final class DecoratingWork implements Runnable { + + // Work item for this work. + private WorkItem workItem; + + // The original work. + private Work decoratedWork; + + /* + * Initializes the work item and underlying work. + */ + private DecoratingWork(final WorkItem workItem, final Work decoratedWork) { + this.workItem = workItem; + this.decoratedWork = decoratedWork; + } + + /* + * Overrides the run method. + */ + public void run() { + workStarted(workItem, decoratedWork); + try { + decoratedWork.run(); + workCompleted(workItem, decoratedWork); + } catch (Throwable th) { + workCompleted(workItem, decoratedWork, new WorkSchedulerException(th.getMessage(), th)); + } + } + + } + + public void destroy() { + executor.shutdown(); + } + + public ExecutorService getExecutorService() { + return executor; + } + +} + diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/Work.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/Work.java new file mode 100644 index 0000000000..ca06d0e854 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/Work.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.work.impl; + +/** + * JCA work wrapper. + * + * @version $Rev$ $Date$ + */ +public class Work { + + // Work that is being executed. + private T work; + + /* + * Initializes the work instance. + */ + public Work(T work) { + this.work = work; + } + + /* + * Returns the completed work. + */ + public T getWork() { + return work; + } + + /* + * Release the work. + */ + public void release() { + } + + /* + * Work attributes are not daemon. + */ + public boolean isDaemon() { + return false; + } + + /* + * Runs the work. + */ + public void run() { + work.run(); + } +} \ No newline at end of file diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/WorkEvent.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/WorkEvent.java new file mode 100644 index 0000000000..4580011806 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/WorkEvent.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.work.impl; + +import org.apache.tuscany.sca.work.WorkSchedulerException; + + + +/** + * Default immutable implementation of the WorkEvent class. + * + * @version $Rev$ $Date$ + */ +class WorkEvent { + + public static final int WORK_ACCEPTED = 1; + public static final int WORK_REJECTED = 2; + public static final int WORK_STARTED = 3; + public static final int WORK_COMPLETED = 4; + + // Work item for this event + private WorkItem workItem; + + // Exception if something has gone wrong + private WorkSchedulerException exception; + + /** + * Instantiates the event. + * + * @param workItem Work item for this event. + */ + public WorkEvent(final WorkItem workItem) { + this.workItem = workItem; + this.exception = workItem.getException(); + } + + /** + * Returns the work type based on whether the work was accepted, started, + * rejected or completed. + * + * @return Work type. + */ + public int getType() { + return workItem.getStatus(); + } + + /** + * Returns the work item associated with this work type. + * + * @return Work item. + */ + public WorkItem getWorkItem() { + return workItem; + } + + /** + * Returns the exception if the work completed with an exception. + * + * @return Work exception. + */ + public WorkSchedulerException getException() { + return exception; + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/WorkItem.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/WorkItem.java new file mode 100644 index 0000000000..0fc104d0fc --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/WorkItem.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tuscany.sca.core.work.impl; + +import org.apache.tuscany.sca.work.WorkSchedulerException; + +/** + * An identity based immutable implementation of the WorkItem + * interface. + * + * @version $Rev$ $Date$ + */ +class WorkItem { + + // Id scoped for the VM + private String id; + + // Status + private int status = -1; + + // Result + private Work result; + + // Original work + private Work originalWork; + + // Exception + private WorkSchedulerException exception; + + /** + * Instantiates an id for this item. + * + * @param id of this work event. + */ + protected WorkItem(final String id, final Work orginalWork) { + this.id = id; + this.originalWork = orginalWork; + } + + /** + * Returns the id. + * + * @return Id of this item. + */ + public String getId() { + return id; + } + + /** + * Returns the original work. + * + * @return Original work. + */ + public Work getOriginalWork() { + return originalWork; + } + + /** + * Returns the work result if the work completed. + * + * @return Work. + * @throws WorkException If the work completed with an exception. + */ + public Work getResult() { + return result; + } + + /** + * Sets the result. + * + * @param result Result. + */ + protected void setResult(final Work result) { + this.result = result; + } + + /** + * Returns the exception if work completed with an exception. + * + * @return Work exception. + */ + protected WorkSchedulerException getException() { + return exception; + } + + /** + * Sets the exception. + * + * @param exception Exception. + */ + protected void setException(final WorkSchedulerException exception) { + this.exception = exception; + } + + /** + * Returns the work type based on whether the work was accepted, started, + * rejected or completed. + * + * @return Work status. + */ + public int getStatus() { + return status; + } + + /** + * Sets the status. + * + * @param status Status. + */ + protected void setStatus(final int status) { + this.status = status; + } + + /** + * @see Object#hashCode() + */ + @Override + public int hashCode() { + return id.hashCode(); + } + + /** + * Indicates whether some other object is "equal to" this one. + * + * @param obj Object to be compared. + * @return true if this object is the same as the obj argument; false + * otherwise.. + */ + @Override + public boolean equals(final Object obj) { + return (obj != null) && (obj.getClass() == WorkItem.class) && ((WorkItem) obj).id.equals(id); + } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less + * than, equal to, or greater than the specified object. + * + * @param o Object to be compared. + * @return A negative integer, zero, or a positive integer as this object + * is less than, equal to, or greater than the specified object. + * @throws ClassCastException needs better documentation. + */ + public int compareTo(final Object o) { + if (o.getClass() != WorkItem.class) { + throw new ClassCastException(o.getClass().getName()); + } else { + return ((WorkItem) o).getId().compareTo(getId()); + } + } +} diff --git a/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/WorkListener.java b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/WorkListener.java new file mode 100644 index 0000000000..38bbaeebe0 --- /dev/null +++ b/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/WorkListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.core.work.impl; + +public interface WorkListener { + + long IMMEDIATE = 0; + long INDEFINITE = java.lang.Long.MAX_VALUE; + + void workAccepted(WorkEvent event); + void workCompleted(WorkEvent event); + void workRejected(WorkEvent event); + void workStarted(WorkEvent event); + +} -- cgit v1.2.3