summaryrefslogtreecommitdiffstats
path: root/java/sca/modules/core
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--java/sca/modules/core-spi/src/main/java/org/apache/tuscany/sca/work/WorkScheduler.java31
-rw-r--r--java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java13
-rw-r--r--java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/impl/ConversationManagerImpl.java24
-rw-r--r--java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/DefaultWorkScheduler.java16
-rw-r--r--java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManager.java53
-rw-r--r--java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/Jsr237WorkSchedulerTestCase.java8
-rw-r--r--java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManagerTestCase.java28
7 files changed, 84 insertions, 89 deletions
diff --git a/java/sca/modules/core-spi/src/main/java/org/apache/tuscany/sca/work/WorkScheduler.java b/java/sca/modules/core-spi/src/main/java/org/apache/tuscany/sca/work/WorkScheduler.java
index b1beac78e2..6d18b48002 100644
--- a/java/sca/modules/core-spi/src/main/java/org/apache/tuscany/sca/work/WorkScheduler.java
+++ b/java/sca/modules/core-spi/src/main/java/org/apache/tuscany/sca/work/WorkScheduler.java
@@ -6,53 +6,48 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.tuscany.sca.work;
/**
* Defines the contract for scheduling asynchronous units of work.
- *
+ *
* <p>
* Units of work can be scheduled with an optional <code>NotificationListener</code>.
- * If a notification listener is specified, the caller will be notified regarding the
- * status of the work. The unit of work can either be completed, rejected or completed
- * with an error. If the work completed with an error, the caller is notified with the
+ * If a notification listener is specified, the caller will be notified regarding the
+ * status of the work. The unit of work can either be completed, rejected or completed
+ * with an error. If the work completed with an error, the caller is notified with the
* error details.
* </p>
*
* @version $Rev$ $Date$
*/
public interface WorkScheduler {
-
+
/**
- * Schedules a unit of work for future execution. The notification listener
+ * Schedules a unit of work for future execution. The notification listener
* is used to register interest in callbacks regarding the status of the work.
- *
+ *
* @param work The unit of work that needs to be asynchronously executed.
* @param listener Notification listener for callbacks.
*/
<T extends Runnable>void scheduleWork(T work, NotificationListener<T> listener);
-
+
/**
- * Schedules a unit of work for future execution. The notification listener
+ * Schedules a unit of work for future execution. The notification listener
* is used to register interest in callbacks regarding the status of the work.
- *
+ *
* @param work The unit of work that needs to be asynchronously executed.
*/
<T extends Runnable>void scheduleWork(T work);
- /**
- * Destroys the work scheduler
- */
- void destroy();
-
}
diff --git a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java
index ed858bd499..89987a1117 100644
--- a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java
+++ b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java
@@ -6,22 +6,22 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.tuscany.sca.core.conversation;
/**
* The manager of conversations
- *
+ *
* @version $Rev$ $Date$
*/
public interface ConversationManager {
@@ -58,14 +58,15 @@ public interface ConversationManager {
* @param listener
*/
void removeListener(ConversationListener listener);
-
+
/**
* @return the default max age for a conversation
*/
long getMaxAge();
-
+
/**
* @return the default max idle time for a conversation
*/
long getMaxIdleTime();
+
}
diff --git a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/impl/ConversationManagerImpl.java b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/impl/ConversationManagerImpl.java
index e23659b990..d2dff6ea09 100644
--- a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/impl/ConversationManagerImpl.java
+++ b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/impl/ConversationManagerImpl.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.tuscany.sca.core.conversation.impl;
@@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.tuscany.sca.core.LifeCycleListener;
import org.apache.tuscany.sca.core.conversation.ConversationExt;
import org.apache.tuscany.sca.core.conversation.ConversationListener;
import org.apache.tuscany.sca.core.conversation.ConversationManager;
@@ -39,7 +40,7 @@ import org.apache.tuscany.sca.core.conversation.ConversationState;
/**
* @version $Rev$ $Date$
*/
-public class ConversationManagerImpl implements ConversationManager {
+public class ConversationManagerImpl implements ConversationManager, LifeCycleListener {
private List<ConversationListener> listeners = Collections.synchronizedList(new ArrayList<ConversationListener>());
private Map<Object, ConversationExt> conversations = new ConcurrentHashMap<Object, ConversationExt>();
@@ -174,8 +175,8 @@ public class ConversationManagerImpl implements ConversationManager {
*/
public synchronized void stopReaper() {
- // Prevent the scheduler from submitting any additional reapers,
- // initiate an orderly shutdown if a reaper task is in progress.
+ // Prevent the scheduler from submitting any additional reapers,
+ // initiate an orderly shutdown if a reaper task is in progress.
this.scheduler.shutdown();
}
@@ -215,4 +216,15 @@ public class ConversationManagerImpl implements ConversationManager {
public long getMaxAge() {
return maxAge;
}
+
+ public void stop() {
+ // REVIEW: A more graceful way?
+ scheduler.shutdownNow();
+ this.listeners.clear();
+ this.conversations.clear();
+ }
+
+ public void start() {
+ }
+
}
diff --git a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/DefaultWorkScheduler.java b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/DefaultWorkScheduler.java
index 3df2f7188b..9e6f3b3087 100644
--- a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/DefaultWorkScheduler.java
+++ b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/DefaultWorkScheduler.java
@@ -6,21 +6,22 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.tuscany.sca.core.work.impl;
import java.security.AccessController;
import java.security.PrivilegedAction;
+import org.apache.tuscany.sca.core.LifeCycleListener;
import org.apache.tuscany.sca.work.NotificationListener;
import org.apache.tuscany.sca.work.WorkScheduler;
import org.apache.tuscany.sca.work.WorkSchedulerException;
@@ -36,7 +37,7 @@ import org.apache.tuscany.sca.work.WorkSchedulerException;
*
* @version $Rev$ $Date$
*/
-public class DefaultWorkScheduler implements WorkScheduler {
+public class DefaultWorkScheduler implements WorkScheduler, LifeCycleListener {
/**
* Underlying JSR-237 work manager
@@ -62,7 +63,7 @@ public class DefaultWorkScheduler implements WorkScheduler {
// // ignore
// }
if (jsr237WorkManager == null) {
- jsr237WorkManager = new ThreadPoolWorkManager(10);
+ jsr237WorkManager = new ThreadPoolWorkManager(0);
}
return jsr237WorkManager;
}
@@ -110,7 +111,10 @@ public class DefaultWorkScheduler implements WorkScheduler {
}
- public void destroy() {
+ public void start() {
+ }
+
+ public void stop() {
if (jsr237WorkManager instanceof ThreadPoolWorkManager) {
// Allow privileged access to modify threads. Requires RuntimePermission in security
// policy.
diff --git a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManager.java b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManager.java
index 12fa4a485d..e7728ca9a9 100644
--- a/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManager.java
+++ b/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManager.java
@@ -6,21 +6,21 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.tuscany.sca.core.work.impl;
-import java.rmi.server.UID;
import java.util.Collection;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -28,7 +28,6 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import org.apache.tuscany.sca.work.WorkSchedulerException;
-import org.oasisopen.sca.annotation.Destroy;
/**
* A thread-pool based implementation for the JSR-237 work manager.
@@ -51,24 +50,24 @@ public class ThreadPoolWorkManager {
/**
* Initializes the thread-pool.
*
- * @param threadPoolSize Thread-pool size.
- * @throws IllegalArgumentException if threadPoolSize < 1
+ * @param threadPoolSize Thread-pool size. If the size <1, then a cached pool is created
*/
public ThreadPoolWorkManager(int threadPoolSize) {
- if (threadPoolSize < 1) {
- throw new IllegalArgumentException("Invalid threadPoolSize of "
- + threadPoolSize + ". It must be >= 1");
- }
-
- // Creates a new Executor, use a custom ThreadFactory that
- // creates daemon threads.
- executor = Executors.newFixedThreadPool(threadPoolSize, new ThreadFactory() {
+ ThreadFactory factory = new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
}
- });
+ };
+ if (threadPoolSize <= 0) {
+
+ // Creates a new Executor, use a custom ThreadFactory that
+ // creates daemon threads.
+ executor = Executors.newCachedThreadPool(factory);
+ } else {
+ executor = Executors.newFixedThreadPool(threadPoolSize);
+ }
}
/**
@@ -77,7 +76,7 @@ public class ThreadPoolWorkManager {
* @param work Work that needs to be scheduled.
* @return Work Work item representing the asynchronous work
*/
- public WorkItem schedule(Work<?> work) throws IllegalArgumentException {
+ public WorkItem schedule(Work work) throws IllegalArgumentException {
return schedule(work, null);
}
@@ -88,9 +87,9 @@ public class ThreadPoolWorkManager {
* @param workListener Work listener for callbacks.
* @return Work Work item representing the asynchronous work
*/
- public WorkItem schedule(Work<?> work, WorkListener workListener) throws IllegalArgumentException {
+ public WorkItem schedule(Work work, WorkListener workListener) throws IllegalArgumentException {
- WorkItem workItem = new WorkItem(new UID().toString(), work);
+ WorkItem workItem = new WorkItem(UUID.randomUUID().toString(), work);
if (workListener != null) {
workItems.put(workItem, workListener);
}
@@ -112,7 +111,7 @@ public class ThreadPoolWorkManager {
* @param works Units of the work that need to finish.
* @param timeout Timeout for waiting for the units of work to finish.
*/
- public boolean waitForAll(Collection<Work<?>> works, long timeout) {
+ public boolean waitForAll(Collection works, long timeout) {
throw new UnsupportedOperationException("waitForAll not supported");
}
@@ -122,7 +121,7 @@ public class ThreadPoolWorkManager {
* @param works Units of the work that need to finish.
* @param timeout Timeout for waiting for the units of work to finish.
*/
- public Collection<Work<?>> waitForAny(Collection<Work<?>> works, long timeout) {
+ public Collection waitForAny(Collection works, long timeout) {
throw new UnsupportedOperationException("waitForAny not supported");
}
@@ -132,7 +131,7 @@ public class ThreadPoolWorkManager {
* @param workItem Work item representing the work that was accepted.
* @param work Work that was accepted.
*/
- private void workAccepted(final WorkItem workItem, final Work<?> work) {
+ private void workAccepted(final WorkItem workItem, final Work work) {
WorkListener listener = workItems.get(workItem);
if (listener != null) {
workItem.setStatus(WorkEvent.WORK_ACCEPTED);
@@ -144,7 +143,7 @@ public class ThreadPoolWorkManager {
/*
* Method to indicate a work start.
*/
- private void workStarted(final WorkItem workItem, final Work<?> work) {
+ private void workStarted(final WorkItem workItem, final Work work) {
WorkListener listener = workItems.get(workItem);
if (listener != null) {
workItem.setStatus(WorkEvent.WORK_STARTED);
@@ -156,14 +155,14 @@ public class ThreadPoolWorkManager {
/*
* Method to indicate a work completion.
*/
- private void workCompleted(final WorkItem workItem, final Work<?> work) {
+ private void workCompleted(final WorkItem workItem, final Work work) {
workCompleted(workItem, work, null);
}
/*
* Method to indicate a work completion.
*/
- private void workCompleted(final WorkItem workItem, final Work<?> work, final WorkSchedulerException exception) {
+ private void workCompleted(final WorkItem workItem, final Work work, final WorkSchedulerException exception) {
WorkListener listener = workItems.get(workItem);
if (listener != null) {
workItem.setStatus(WorkEvent.WORK_COMPLETED);
@@ -178,7 +177,7 @@ public class ThreadPoolWorkManager {
/*
* Schedules the work using the ThreadPool.
*/
- private boolean scheduleWork(final Work<?> work, final WorkItem workItem) {
+ private boolean scheduleWork(final Work work, final WorkItem workItem) {
try {
executor.execute(new DecoratingWork(workItem, work));
return true;
@@ -221,9 +220,9 @@ public class ThreadPoolWorkManager {
}
- @Destroy
public void destroy() {
executor.shutdown();
}
}
+
diff --git a/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/Jsr237WorkSchedulerTestCase.java b/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/Jsr237WorkSchedulerTestCase.java
index 904f4ca5e0..c0f35d427c 100644
--- a/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/Jsr237WorkSchedulerTestCase.java
+++ b/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/Jsr237WorkSchedulerTestCase.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.tuscany.sca.core.work.impl;
@@ -54,7 +54,7 @@ public class Jsr237WorkSchedulerTestCase {
@AfterClass
public static void destroy() {
if (workSchedular != null) {
- workSchedular.destroy();
+ workSchedular.stop();
}
}
diff --git a/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManagerTestCase.java b/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManagerTestCase.java
index b66fa75828..122609d7e6 100644
--- a/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManagerTestCase.java
+++ b/java/sca/modules/core/src/test/java/org/apache/tuscany/sca/core/work/impl/ThreadPoolWorkManagerTestCase.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
package org.apache.tuscany.sca.core.work.impl;
@@ -25,7 +25,7 @@ import org.junit.Test;
/**
* This test case will test the ThreadPoolWorkManager
- *
+ *
* @version $Rev$ $Date$
*/
public class ThreadPoolWorkManagerTestCase {
@@ -130,7 +130,7 @@ public class ThreadPoolWorkManagerTestCase {
}
/**
- * Tests running a mixture of fast and slow jobs some of which fail on the
+ * Tests running a mixture of fast and slow jobs some of which fail on the
* ThreadPoolWorkManager
*/
@Test
@@ -164,22 +164,6 @@ public class ThreadPoolWorkManagerTestCase {
}
/**
- * Tests creating a ThreadPoolWorkManager with invalid pool sizes of -10 to 0
- * inclusive
- */
- @Test
- public void testThreadPoolWorkManagerLessThan1Size() {
- for (int i = 0; i >= -10; i--) {
- try {
- new ThreadPoolWorkManager(i);
- Assert.fail("Should have thrown IllegalArgumentException");
- } catch (IllegalArgumentException ex) {
- Assert.assertTrue(ex.toString().indexOf(Integer.toString(i)) != -1);
- }
- }
- }
-
- /**
* Tests running a single job that has no listener
*/
@Test
@@ -215,7 +199,7 @@ public class ThreadPoolWorkManagerTestCase {
/**
* Waits for the specified number of jobs to complete or the timeout to fire.
- *
+ *
* @param listener The listener to use to track Work unit completion
* @param completedWorkItemsToWaitFor The number of Work items to complete
*/