diff options
Diffstat (limited to 'sandbox/sebastien/java/embed/itest/callback-separatethread/src/main')
6 files changed, 528 insertions, 0 deletions
diff --git a/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/CallBackSeparateThreadClient.java b/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/CallBackSeparateThreadClient.java new file mode 100644 index 0000000000..dc66003604 --- /dev/null +++ b/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/CallBackSeparateThreadClient.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.itest; + +import org.oasisopen.sca.annotation.Remotable; + +/** + * This is the client interface for the call backs in a separate thread tests + */ +@Remotable +public interface CallBackSeparateThreadClient { + + /** + * This tests call back patterns using separate threads. + */ + void runTests(); +} diff --git a/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/CallBackSeparateThreadClientImpl.java b/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/CallBackSeparateThreadClientImpl.java new file mode 100644 index 0000000000..f03144db8a --- /dev/null +++ b/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/CallBackSeparateThreadClientImpl.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.itest; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.oasisopen.sca.annotation.Reference; +import org.oasisopen.sca.annotation.Service; + +/** + * This is the client implementation for the call backs in a separate thread tests + */ +@Service(CallBackSeparateThreadClient.class) +public class CallBackSeparateThreadClientImpl implements CallBackSeparateThreadClient, EventProcessorCallBack { + /** + * Max time to wait to receive events. If not all the events are received then timeout. + */ + private static final int TIMEOUT = 30 * 1000; + + /** + * Counts the number of fast call backs. + */ + private static final AtomicInteger FAST_CALLBACK_COUNT = new AtomicInteger(); + + /** + * Counts the number of slow call backs. + */ + private static final AtomicInteger SLOW_CALLBACK_COUNT = new AtomicInteger(); + + /** + * This is our injected reference to the EventProcessorService + */ + @Reference + protected EventProcessorService aCallBackService; + + /** + * This tests call back patterns using separate threads. + */ + public void runTests() { + try { + // Register for fast call back + registerForFastCallback(); + + // Wait for a few fast call backs + System.out.println("Waiting for some fast call backs"); + waitForSomeFastCallbacks(); + + try { + // Register for slow call back + registerForSlowCallback(); + + // Wait for a few fast call backs + System.out.println("Waiting for some fast calls"); + waitForSomeFastCallbacks(); + + // Wait for a few slow call backs + System.out.println("Waiting for some slow calls"); + waitForSomeSlowCallbacks(); + } finally { + unregisterForSlowCallback(); + } + + System.out.println("Done"); + } finally { + unregisterForFastCallback(); + } + } + + /** + * Waits for some fast call backs to be fired + */ + private void waitForSomeFastCallbacks() { + // Reset the fast call back count + FAST_CALLBACK_COUNT.set(0); + + // Wait until we have 10 fast call backs or timeout occurs + final long start = System.currentTimeMillis(); + do { + if (FAST_CALLBACK_COUNT.get() >= 10) { + System.out.println("Received enough fast notifications"); + return; + } + + try { + Thread.sleep(5); + } catch (InterruptedException e) { + Assert.fail("Unexpeceted exception " + e); + } + } while (System.currentTimeMillis() - start < TIMEOUT); + + // If we get to here then we did not receive enough events + Assert.fail("Did not receive enough fast events"); + } + + /** + * Waits for some slow call backs to be fired + */ + private void waitForSomeSlowCallbacks() { + // Reset the slow call back count + SLOW_CALLBACK_COUNT.set(0); + + // Wait until we have 4 slow call backs or timeout + final long start = System.currentTimeMillis(); + do { + if (SLOW_CALLBACK_COUNT.get() >= 4) { + System.out.println("Received enough slow notifications"); + return; + } + + try { + Thread.sleep(5); + } catch (InterruptedException e) { + Assert.fail("Unexpeceted exception " + e); + } + } while (System.currentTimeMillis() - start < TIMEOUT); + + // If we get to here then we did not receive enough events + Assert.fail("Did not receive enough slow events"); + } + + /** + * Register to receive fast call backs + */ + private void registerForFastCallback() { + aCallBackService.registerForEvent("FAST"); + } + + /** + * Register to receive slow call backs + */ + private void registerForSlowCallback() { + aCallBackService.registerForEvent("SLOW"); + } + + /** + * Unregister to receive fast call backs + */ + private void unregisterForFastCallback() { + aCallBackService.unregisterForEvent("FAST"); + } + + /** + * Unregister to receive slow call backs + */ + private void unregisterForSlowCallback() { + aCallBackService.unregisterForEvent("SLOW"); + } + + /** + * Method that is called when an Event is delivered. + * + * @param aEventName The name of the Event + * @param aEventData The Event data + */ + public void eventNotification(String aEventName, Object aEventData) { + // System.out.println("Received Event : " + aEventName + " " + aEventData); + + if (aEventName.equals("FAST")) { + final int newValue = FAST_CALLBACK_COUNT.incrementAndGet(); + //System.out.println("Received total of " + newValue + " fast call backs"); + } else if (aEventName.equals("SLOW")) { + final int newValue = SLOW_CALLBACK_COUNT.incrementAndGet(); + //System.out.println("Received total of " + newValue + " slow call backs"); + } else { + System.out.println("Unknown event type of " + aEventName); + } + } +} diff --git a/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/EventProcessorCallBack.java b/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/EventProcessorCallBack.java new file mode 100644 index 0000000000..137ea6a658 --- /dev/null +++ b/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/EventProcessorCallBack.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.itest; + +import org.oasisopen.sca.annotation.Remotable; + +/** + * The call back interface for the EventProcessorService that is implemented + * by the client to receive event notifications + */ +@Remotable +public interface EventProcessorCallBack { + /** + * Call back notifying client of an Event + * + * @param aEventName The name of the Event + * @param aEventData The data for the Event + */ + void eventNotification(String aEventName, Object aEventData); +} diff --git a/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/EventProcessorService.java b/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/EventProcessorService.java new file mode 100644 index 0000000000..4bf05fc925 --- /dev/null +++ b/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/EventProcessorService.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.itest; + +import org.oasisopen.sca.annotation.Callback; +import org.oasisopen.sca.annotation.Remotable; + +/** + * Sample Event Processor Service + */ +@Callback(EventProcessorCallBack.class) +@Remotable +public interface EventProcessorService { + + /** + * Registers the client to receive notifications for the specified event + * + * @param aEventName The name of the Event to register + */ + void registerForEvent(String aEventName); + + /** + * Unregisters the client so it no longer receives notifications for the specified event + * + * @param aEventName The name of the Event to unregister + */ + void unregisterForEvent(String aEventName); +} diff --git a/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/EventProcessorServiceImpl.java b/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/EventProcessorServiceImpl.java new file mode 100644 index 0000000000..bec98a49c9 --- /dev/null +++ b/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/java/org/apache/tuscany/sca/itest/EventProcessorServiceImpl.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tuscany.sca.itest; + +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import org.oasisopen.sca.ServiceReference; +import org.oasisopen.sca.annotation.Callback; +import org.oasisopen.sca.annotation.Destroy; +import org.oasisopen.sca.annotation.Scope; +import org.oasisopen.sca.annotation.Service; + +/** + * Sample Event Processor Service Implementation + */ +@Service(EventProcessorService.class) +@Scope("COMPOSITE") +public class EventProcessorServiceImpl implements EventProcessorService { + + /** + * Reference to the call back + */ + @Callback + protected ServiceReference<EventProcessorCallBack> clientCallback; + + /** + * This map contains the call backs for each of the registered Event names + */ + private final Map<String, ServiceReference<EventProcessorCallBack>> eventListeners; + + /** + * The list of all Event Generators we create + */ + private final EventGenerator[] allEventGenerators; + + /** + * Constructor. Starts the Event Generators + */ + public EventProcessorServiceImpl() { + eventListeners = new ConcurrentHashMap<String, ServiceReference<EventProcessorCallBack>>(); + + // We will simulate an Event generator + allEventGenerators = new EventGenerator[2]; + allEventGenerators[0] = new EventGenerator("FAST", 10); // Generate the FAST event every 10ms + allEventGenerators[1] = new EventGenerator("SLOW", 50); // Generate the SLOW event every 50ms + } + + /** + * Registers the client to receive notifications for the specified event + * + * @param aEventName The name of the Event to register + */ + public void registerForEvent(String aEventName) { + // Register for the Event + eventListeners.put(aEventName, clientCallback); + + // Send the "register" started event to the client + receiveEvent(aEventName, "SameThread: Registered to receive notifications for " + aEventName); + } + + /** + * Unregisters the client so it no longer receives notifications for the specified event + * + * @param aEventName The name of the Event to unregister + */ + public void unregisterForEvent(String aEventName) { + // Send the "register" started event to the client + receiveEvent(aEventName, "SameThread: Unregister from receiving notifications for " + aEventName); + + eventListeners.remove(aEventName); + } + + /** + * This method is called whenever the EventProcessorService receives an Event + * + * @param aEventName The name of the Event received + * @param aEventData The Event data + */ + private void receiveEvent(String aEventName, Object aEventData) { + // Get the listener for the Event + final ServiceReference<EventProcessorCallBack> callback = eventListeners.get(aEventName); + if (callback == null) { + //System.out.println("No registered listeners for " + aEventName); + return; + } + + // Trigger the call back + // System.out.println("Notifying " + callback + " of event " + aEventName); + callback.getService().eventNotification(aEventName, aEventData); + // System.out.println("Done notify " + callback + " of event " + aEventName); + } + + /** + * Shuts down the Event Processor + */ + @Destroy + public void shutdown() { + System.out.println("Shutting down the EventProcessor"); + + // Clear list of call back locations as we don't want to send any more notifications + eventListeners.clear(); + + // Stop the Event Generators + for (EventGenerator generator : allEventGenerators) { + generator.stop(); + } + } + + /** + * Utility class for generating Events + */ + private final class EventGenerator { + /** + * The Timer we are using to generate the events + */ + private final Timer timer = new Timer(); + + /** + * Lock object to ensure that we can cancel the timer cleanly. + */ + private final Object lock = new Object(); + + /** + * Constructor + * + * @param aEventName The name of the Event to generate + * @param frequencyInMilliseconds How frequently we should generate the Events + */ + private EventGenerator(String aEventName, int frequencyInMilliseconds) { + timer.schedule(new EventGeneratorTimerTask(aEventName), + frequencyInMilliseconds, + frequencyInMilliseconds); + } + + /** + * Stop this Event Generator + */ + private void stop() { + synchronized (lock) { + timer.cancel(); + } + } + + /** + * The TimerTask that is invoked by the Timer for the EventGenerator + */ + private final class EventGeneratorTimerTask extends TimerTask { + /** + * The name of the Event we should generate + */ + private final String eventName; + + /** + * Constructor + * + * @param aEventName The name of the Event we should generate + */ + private EventGeneratorTimerTask(String aEventName) { + eventName = aEventName; + } + + /** + * Timer calls this method and it will generate an Event + */ + + public void run() { + synchronized(lock) { + // System.out.println("Generating new event " + eventName); + receiveEvent(eventName, "Separate Thread Notification: " + UUID.randomUUID().toString()); + } + } + } + } +} diff --git a/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/resources/CallBackSeparateThreadTest.composite b/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/resources/CallBackSeparateThreadTest.composite new file mode 100644 index 0000000000..b10f3bc771 --- /dev/null +++ b/sandbox/sebastien/java/embed/itest/callback-separatethread/src/main/resources/CallBackSeparateThreadTest.composite @@ -0,0 +1,31 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +--> +<composite xmlns="http://docs.oasis-open.org/ns/opencsa/sca/200912" targetNamespace="http://callback" name="CallBackSeparateThreadTest"> + + <component name="CallBackSeparateThreadClient"> + <implementation.java class="org.apache.tuscany.sca.itest.CallBackSeparateThreadClientImpl"/> + <reference name="aCallBackService" target="EventProcessorService"/> + </component> + + <component name="EventProcessorService"> + <implementation.java class="org.apache.tuscany.sca.itest.EventProcessorServiceImpl"/> + </component> + +</composite> |