summaryrefslogtreecommitdiffstats
path: root/tags/java/sca/1.5.1/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java
blob: 3ae1ca51c5dd723b40069ee320a63df225153211 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.    
 */

package org.apache.tuscany.sca.core.scope;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.tuscany.sca.core.context.InstanceWrapper;
import org.apache.tuscany.sca.core.conversation.ConversationListener;
import org.apache.tuscany.sca.core.conversation.ConversationManager;
import org.apache.tuscany.sca.core.conversation.ExtendedConversation;
import org.apache.tuscany.sca.core.invocation.ThreadMessageContext;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.store.Store;

/**
 * A scope context which manages atomic component instances keyed on ConversationID
 *
 * @version $Rev$ $Date$
 */
public class ConversationalScopeContainer extends AbstractScopeContainer<Object> implements ConversationListener {
    private ConversationManager conversationManager;
    private Map<Object, InstanceLifeCycleWrapper> instanceLifecycleCollection =
        new ConcurrentHashMap<Object, InstanceLifeCycleWrapper>();

    public ConversationalScopeContainer(Store aStore, RuntimeComponent component) {
        super(Scope.CONVERSATION, component);

        // Note: aStore is here to preserve the original factory interface. It is not currently used in this 
        // implementation since we do not support instance persistence.

        // Check System properties to see if timeout values have been specified. All timeout values 
        // will be specified in seconds.
        //

    }


    protected InstanceWrapper getInstanceWrapper(boolean create, Object contextId) throws TargetResolutionException {

        // we might get a null context if the target service has
        // conversational scope but only its callback interface 
        // is conversational. In this case we need to invent a 
        // conversation Id here to store the service against
        // and populate the thread context
        if (contextId == null) {
            contextId = UUID.randomUUID().toString();
            Message msgContext = ThreadMessageContext.getMessageContext();

            if (msgContext != null) {
                msgContext.getFrom().getReferenceParameters().setConversationID(contextId);
            }
        }    
        
        InstanceLifeCycleWrapper anInstanceWrapper = this.instanceLifecycleCollection.get(contextId);

        if (anInstanceWrapper == null && !create)
            return null;

        if (anInstanceWrapper == null) {
            anInstanceWrapper = new InstanceLifeCycleWrapper(contextId);
            this.instanceLifecycleCollection.put(contextId, anInstanceWrapper);
        }

        return anInstanceWrapper.getInstanceWrapper(contextId);

    }

    @Override
    public InstanceWrapper getWrapper(Object contextId) throws TargetResolutionException {
        return getInstanceWrapper(true, contextId);
    }

    /**
     * This method allows a new context id to be registered alongside an existing one. This happens in
     * one case, when a conversation includes a stateful callback. The client component instance
 	 * must be registered against all outgoing conversation ids so that the component instance 
	 * can be found when the callback arrives
	 * 
     * @param existingContextId the context id against which the component is already registered
     * @param context this should be a conversation object so that the conversation can b stored 
     *                and reset when the component instance is removed
     */
    @Override
    public void addWrapperReference(Object existingContextId, Object contextId) throws TargetResolutionException {
       
        
        // get the instance wrapper via the existing id
        InstanceLifeCycleWrapper existingInstanceWrapper = this.instanceLifecycleCollection.get(existingContextId);
        InstanceLifeCycleWrapper newInstanceWrapper = this.instanceLifecycleCollection.get(contextId);

        // only add the extra reference once
        if (newInstanceWrapper == null) {
            // add the id to the list of ids that the wrapper holds. Used for reference
            // counting and conversation resetting on destruction. 
            existingInstanceWrapper.addCallbackConversation(contextId);

            // add the reference to the collection
            this.instanceLifecycleCollection.put(contextId, existingInstanceWrapper);
        }
    }

    @Override
    public void registerWrapper(InstanceWrapper wrapper, Object contextId) throws TargetResolutionException {
        // if a wrapper for a different instance is already registered for this contextId, remove it
        InstanceLifeCycleWrapper anInstanceWrapper = this.instanceLifecycleCollection.get(contextId);
        if (anInstanceWrapper != null) {
            if (anInstanceWrapper.getInstanceWrapper(contextId).getInstance() != wrapper.getInstance()) {
                remove(contextId);
            } else {
                return;
            }
        }

        anInstanceWrapper = new InstanceLifeCycleWrapper(wrapper, contextId);
        this.instanceLifecycleCollection.put(contextId, anInstanceWrapper);
    }

    // The remove is invoked when a conversation is explicitly ended. This can occur by using the @EndsConversation or API.  
    // In this case the instance is immediately removed. A new conversation will be started on the next operation
    // associated with this conversationId's service reference. 
    //
    @Override
    public void remove(Object contextId) throws TargetDestructionException {
        if (contextId != null) {
            if (this.instanceLifecycleCollection.containsKey(contextId)) {
                InstanceLifeCycleWrapper anInstanceLifeCycleWrapper = this.instanceLifecycleCollection.get(contextId);
                this.instanceLifecycleCollection.remove(contextId);
                anInstanceLifeCycleWrapper.removeInstanceWrapper(contextId);
            }
        }
    }

    /*
     *  This is an inner class that keeps track of the lifecycle of a conversation scoped 
	 *  implementation instance. 
	 * 
	 */

    private class InstanceLifeCycleWrapper {
        private Object clientConversationId;
        private List<Object> callbackConversations = new ArrayList<Object>();

        private InstanceLifeCycleWrapper(Object contextId) throws TargetResolutionException {
            this.clientConversationId = contextId;
            this.createInstance(contextId);
        }

        private InstanceLifeCycleWrapper(InstanceWrapper wrapper, Object contextId) throws TargetResolutionException {
            this.clientConversationId = contextId;
            wrappers.put(contextId, wrapper);
        }


        // Associates a callback conversation with this instance. Each time the scope container
        // is asked to remove an object given a ontextId an associated conversation object will 
        // have its conversationId reset to null. When the list of ids is empty the component instance
        // will be removed from the scope container
        private void addCallbackConversation(Object conversationID) {
            InstanceWrapper ctx = getInstanceWrapper(clientConversationId);
            callbackConversations.add(conversationID);
            wrappers.put(conversationID, ctx);
        }

        //
        // Return the backing implementation instance  
        //
        private InstanceWrapper getInstanceWrapper(Object contextId) {
            InstanceWrapper ctx = wrappers.get(contextId);
            return ctx;
        }

        private void removeInstanceWrapper(Object contextId) throws TargetDestructionException {
            InstanceWrapper ctx = getInstanceWrapper(contextId);
            wrappers.remove(contextId);

            // find out if we are dealing with the original client conversation id
            // and reset accordingly
            if ( ( clientConversationId != null ) && ( clientConversationId.equals(contextId)) ) {
                clientConversationId = null;
            } else {
                // reset the conversationId in the conversation object if present 
                // so that and ending callback causes the conversation in the originating
                // service reference in the client to be reset
                callbackConversations.remove(contextId);
            }

            // stop the component if this removes the last reference
            if (clientConversationId == null && callbackConversations.isEmpty()) {
                ctx.stop();
            }
        }

        private void createInstance(Object contextId) throws TargetResolutionException {
            InstanceWrapper instanceWrapper = createInstanceWrapper();
            instanceWrapper.start();
            wrappers.put(contextId, instanceWrapper);
        }

    }

    /**
	  * @see org.apache.tuscany.sca.core.conversation.ConversationListener#conversationEnded(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
	  */
    public void conversationEnded(ExtendedConversation conversation) {
        try {
            remove(conversation.getConversationID());
        } catch (Exception ex) {
            
        }
    }

    /**
	  * @see org.apache.tuscany.sca.core.conversation.ConversationListener#conversationExpired(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
	  */
    public void conversationExpired(ExtendedConversation conversation) {
    	
    	Object conversationId = conversation.getConversationID();
    	InstanceLifeCycleWrapper ilcw = instanceLifecycleCollection.get(conversationId);
    	if (ilcw != null) {
    		// cycle through all the references to this instance and
    		// remove them from the underlying wrappers collection and
    		// from the lifecycle wrappers collection
    		
    		for (Object conversationID : ilcw.callbackConversations) {
    			try{
        			ilcw.removeInstanceWrapper(conversationID);
    				remove(conversationID);
                } catch(TargetDestructionException tde) {
    				System.out.println("Could not remove conversation id " + conversationID);
    			}
    		}
    			
    		
    		if (ilcw.clientConversationId != null) {
    			try{
        			ilcw.removeInstanceWrapper(ilcw.clientConversationId);
    				remove(ilcw.clientConversationId);
                } catch(TargetDestructionException tde) {
    				System.out.println("Could not remove conversation id " + ilcw.clientConversationId);
    			}
    		}
    		
        }
    	
    }

    /**
	  * @see org.apache.tuscany.sca.core.conversation.ConversationListener#conversationStarted(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
	  */
    public void conversationStarted(ExtendedConversation conversation) {
        startContext(conversation.getConversationID());
    }

    /**
	  * @return the conversationManager
	  */
    public ConversationManager getConversationManager() {
        return conversationManager;
    }

    /**
	  * @param conversationManager the conversationManager to set
	  */
    public void setConversationManager(ConversationManager conversationManager) {
        this.conversationManager = conversationManager;
    }

}