diff options
Diffstat (limited to '')
7 files changed, 84 insertions, 89 deletions
diff --git a/java/sca/modules/core-spi/src/main/java/org/apache/tuscany/sca/work/WorkScheduler.java b/java/sca/modules/core-spi/src/main/java/org/apache/tuscany/sca/work/WorkScheduler.java index b1beac78e2..6d18b48002 100644 --- a/java/sca/modules/core-spi/src/main/java/org/apache/tuscany/sca/work/WorkScheduler.java +++ b/java/sca/modules/core-spi/src/main/java/org/apache/tuscany/sca/work/WorkScheduler.java @@ -6,53 +6,48 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. */ package org.apache.tuscany.sca.work; /** * Defines the contract for scheduling asynchronous units of work. - * + * * <p> * Units of work can be scheduled with an optional <code>NotificationListener</code>. - * If a notification listener is specified, the caller will be notified regarding the - * status of the work. The unit of work can either be completed, rejected or completed - * with an error. If the work completed with an error, the caller is notified with the + * If a notification listener is specified, the caller will be notified regarding the + * status of the work. The unit of work can either be completed, rejected or completed + * with an error. If the work completed with an error, the caller is notified with the * error details. * </p> * * @version $Rev$ $Date$ */ public interface WorkScheduler { - + /** - * Schedules a unit of work for future execution. The notification listener + * Schedules a unit of work for future execution. The notification listener * is used to register interest in callbacks regarding the status of the work. - * + * * @param work The unit of work that needs to be asynchronously executed. * @param listener Notification listener for callbacks. */ <T extends Runnable>void scheduleWork(T work, NotificationListener<T> listener); - + /** - * Schedules a unit of work for future execution. The notification listener + * Schedules a unit of work for future execution. The notification listener * is used to register interest in callbacks regarding the status of the work. - * + * * @param work The unit of work that needs to be asynchronously executed. */ <T extends Runnable>void scheduleWork(T work); - /** - * Destroys the work scheduler - */ - void destroy(); - } diff --git a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java index ed858bd499..89987a1117 100644 --- a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java +++ b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java @@ -6,22 +6,22 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. */ package org.apache.tuscany.sca.core.conversation; /** * The manager of conversations - * + * * @version $Rev$ $Date$ */ public interface ConversationManager { @@ -58,14 +58,15 @@ public interface ConversationManager { * @param listener */ void removeListener(ConversationListener listener); - + /** * @return the default max age for a conversation */ long getMaxAge(); - + /** * @return the default max idle time for a conversation */ long getMaxIdleTime(); + } diff --git a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/impl/ConversationManagerImpl.java b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/impl/ConversationManagerImpl.java index e23659b990..d2dff6ea09 100644 --- a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/impl/ConversationManagerImpl.java +++ b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/impl/ConversationManagerImpl.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. */ package org.apache.tuscany.sca.core.conversation.impl; @@ -31,6 +31,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.tuscany.sca.core.LifeCycleListener; import org.apache.tuscany.sca.core.conversation.ConversationExt; import org.apache.tuscany.sca.core.conversation.ConversationListener; import org.apache.tuscany.sca.core.conversation.ConversationManager; @@ -39,7 +40,7 @@ import org.apache.tuscany.sca.core.conversation.ConversationState; /** * @version $Rev$ $Date$ */ -public class ConversationManagerImpl implements ConversationManager { +public class ConversationManagerImpl implements ConversationManager, LifeCycleListener { private List<ConversationListener> listeners = Collections.synchronizedList(new ArrayList<ConversationListener>()); private Map<Object, ConversationExt> conversations = new ConcurrentHashMap<Object, ConversationExt>(); @@ -174,8 +175,8 @@ public class ConversationManagerImpl implements ConversationManager { */ public synchronized void stopReaper() { - // Prevent the scheduler from submitting any additional reapers, - // initiate an orderly shutdown if a reaper task is in progress. + // Prevent the scheduler from submitting any additional reapers, + // initiate an orderly shutdown if a reaper task is in progress. this.scheduler.shutdown(); } @@ -215,4 +216,15 @@ public class ConversationManagerImpl implements ConversationManager { public long getMaxAge() { return maxAge; } + + public void stop() { + // REVIEW: A more graceful way? + scheduler.shutdownNow(); + this.listeners.clear(); + this.conversations.clear(); + } + + public void start() { + } + } diff --git a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/DefaultWorkScheduler.java b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/DefaultWorkScheduler.java index 3df2f7188b..9e6f3b3087 100644 --- a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/DefaultWorkScheduler.java +++ b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/DefaultWorkScheduler.java @@ -6,21 +6,22 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. */ package org.apache.tuscany.sca.core.work.impl; import java.security.AccessController; import java.security.PrivilegedAction; +import org.apache.tuscany.sca.core.LifeCycleListener; import org.apache.tuscany.sca.work.NotificationListener; import org.apache.tuscany.sca.work.WorkScheduler; import org.apache.tuscany.sca.work.WorkSchedulerException; @@ -36,7 +37,7 @@ import org.apache.tuscany.sca.work.WorkSchedulerException; * * @version $Rev$ $Date$ */ -public class DefaultWorkScheduler implements WorkScheduler { +public class DefaultWorkScheduler implements WorkScheduler, LifeCycleListener { /** * Underlying JSR-237 work manager @@ -62,7 +63,7 @@ public class DefaultWorkScheduler implements WorkScheduler { // // ignore // } if (jsr237WorkManager == null) { - jsr237WorkManager = new ThreadPoolWorkManager(10); + jsr237WorkManager = new ThreadPoolWorkManager(0); } return jsr237WorkManager; } @@ -110,7 +111,10 @@ public class DefaultWorkScheduler implements WorkScheduler { } - public void destroy() { + public void start() { + } + + public void stop() { if (jsr237WorkManager instanceof ThreadPoolWorkManager) { // Allow privileged access to modify threads. Requires RuntimePermission in security // policy. diff --git a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManager.java b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManager.java index 12fa4a485d..e7728ca9a9 100644 --- a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManager.java +++ b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManager.java @@ -6,21 +6,21 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. */ package org.apache.tuscany.sca.core.work.impl; -import java.rmi.server.UID; import java.util.Collection; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -28,7 +28,6 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import org.apache.tuscany.sca.work.WorkSchedulerException; -import org.oasisopen.sca.annotation.Destroy; /** * A thread-pool based implementation for the JSR-237 work manager. @@ -51,24 +50,24 @@ public class ThreadPoolWorkManager { /** * Initializes the thread-pool. * - * @param threadPoolSize Thread-pool size. - * @throws IllegalArgumentException if threadPoolSize < 1 + * @param threadPoolSize Thread-pool size. If the size <1, then a cached pool is created */ public ThreadPoolWorkManager(int threadPoolSize) { - if (threadPoolSize < 1) { - throw new IllegalArgumentException("Invalid threadPoolSize of " - + threadPoolSize + ". It must be >= 1"); - } - - // Creates a new Executor, use a custom ThreadFactory that - // creates daemon threads. - executor = Executors.newFixedThreadPool(threadPoolSize, new ThreadFactory() { + ThreadFactory factory = new ThreadFactory() { public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); return thread; } - }); + }; + if (threadPoolSize <= 0) { + + // Creates a new Executor, use a custom ThreadFactory that + // creates daemon threads. + executor = Executors.newCachedThreadPool(factory); + } else { + executor = Executors.newFixedThreadPool(threadPoolSize); + } } /** @@ -77,7 +76,7 @@ public class ThreadPoolWorkManager { * @param work Work that needs to be scheduled. * @return Work Work item representing the asynchronous work */ - public WorkItem schedule(Work<?> work) throws IllegalArgumentException { + public WorkItem schedule(Work work) throws IllegalArgumentException { return schedule(work, null); } @@ -88,9 +87,9 @@ public class ThreadPoolWorkManager { * @param workListener Work listener for callbacks. * @return Work Work item representing the asynchronous work */ - public WorkItem schedule(Work<?> work, WorkListener workListener) throws IllegalArgumentException { + public WorkItem schedule(Work work, WorkListener workListener) throws IllegalArgumentException { - WorkItem workItem = new WorkItem(new UID().toString(), work); + WorkItem workItem = new WorkItem(UUID.randomUUID().toString(), work); if (workListener != null) { workItems.put(workItem, workListener); } @@ -112,7 +111,7 @@ public class ThreadPoolWorkManager { * @param works Units of the work that need to finish. * @param timeout Timeout for waiting for the units of work to finish. */ - public boolean waitForAll(Collection<Work<?>> works, long timeout) { + public boolean waitForAll(Collection works, long timeout) { throw new UnsupportedOperationException("waitForAll not supported"); } @@ -122,7 +121,7 @@ public class ThreadPoolWorkManager { * @param works Units of the work that need to finish. * @param timeout Timeout for waiting for the units of work to finish. */ - public Collection<Work<?>> waitForAny(Collection<Work<?>> works, long timeout) { + public Collection waitForAny(Collection works, long timeout) { throw new UnsupportedOperationException("waitForAny not supported"); } @@ -132,7 +131,7 @@ public class ThreadPoolWorkManager { * @param workItem Work item representing the work that was accepted. * @param work Work that was accepted. */ - private void workAccepted(final WorkItem workItem, final Work<?> work) { + private void workAccepted(final WorkItem workItem, final Work work) { WorkListener listener = workItems.get(workItem); if (listener != null) { workItem.setStatus(WorkEvent.WORK_ACCEPTED); @@ -144,7 +143,7 @@ public class ThreadPoolWorkManager { /* * Method to indicate a work start. */ - private void workStarted(final WorkItem workItem, final Work<?> work) { + private void workStarted(final WorkItem workItem, final Work work) { WorkListener listener = workItems.get(workItem); if (listener != null) { workItem.setStatus(WorkEvent.WORK_STARTED); @@ -156,14 +155,14 @@ public class ThreadPoolWorkManager { /* * Method to indicate a work completion. */ - private void workCompleted(final WorkItem workItem, final Work<?> work) { + private void workCompleted(final WorkItem workItem, final Work work) { workCompleted(workItem, work, null); } /* * Method to indicate a work completion. */ - private void workCompleted(final WorkItem workItem, final Work<?> work, final WorkSchedulerException exception) { + private void workCompleted(final WorkItem workItem, final Work work, final WorkSchedulerException exception) { WorkListener listener = workItems.get(workItem); if (listener != null) { workItem.setStatus(WorkEvent.WORK_COMPLETED); @@ -178,7 +177,7 @@ public class ThreadPoolWorkManager { /* * Schedules the work using the ThreadPool. */ - private boolean scheduleWork(final Work<?> work, final WorkItem workItem) { + private boolean scheduleWork(final Work work, final WorkItem workItem) { try { executor.execute(new DecoratingWork(workItem, work)); return true; @@ -221,9 +220,9 @@ public class ThreadPoolWorkManager { } - @Destroy public void destroy() { executor.shutdown(); } } + diff --git a/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/Jsr237WorkSchedulerTestCase.java b/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/Jsr237WorkSchedulerTestCase.java index 904f4ca5e0..c0f35d427c 100644 --- a/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/Jsr237WorkSchedulerTestCase.java +++ b/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/Jsr237WorkSchedulerTestCase.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. */ package org.apache.tuscany.sca.core.work.impl; @@ -54,7 +54,7 @@ public class Jsr237WorkSchedulerTestCase { @AfterClass public static void destroy() { if (workSchedular != null) { - workSchedular.destroy(); + workSchedular.stop(); } } diff --git a/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManagerTestCase.java b/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManagerTestCase.java index b66fa75828..122609d7e6 100644 --- a/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManagerTestCase.java +++ b/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManagerTestCase.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. */ package org.apache.tuscany.sca.core.work.impl; @@ -25,7 +25,7 @@ import org.junit.Test; /** * This test case will test the ThreadPoolWorkManager - * + * * @version $Rev$ $Date$ */ public class ThreadPoolWorkManagerTestCase { @@ -130,7 +130,7 @@ public class ThreadPoolWorkManagerTestCase { } /** - * Tests running a mixture of fast and slow jobs some of which fail on the + * Tests running a mixture of fast and slow jobs some of which fail on the * ThreadPoolWorkManager */ @Test @@ -164,22 +164,6 @@ public class ThreadPoolWorkManagerTestCase { } /** - * Tests creating a ThreadPoolWorkManager with invalid pool sizes of -10 to 0 - * inclusive - */ - @Test - public void testThreadPoolWorkManagerLessThan1Size() { - for (int i = 0; i >= -10; i--) { - try { - new ThreadPoolWorkManager(i); - Assert.fail("Should have thrown IllegalArgumentException"); - } catch (IllegalArgumentException ex) { - Assert.assertTrue(ex.toString().indexOf(Integer.toString(i)) != -1); - } - } - } - - /** * Tests running a single job that has no listener */ @Test @@ -215,7 +199,7 @@ public class ThreadPoolWorkManagerTestCase { /** * Waits for the specified number of jobs to complete or the timeout to fire. - * + * * @param listener The listener to use to track Work unit completion * @param completedWorkItemsToWaitFor The number of Work items to complete */ |