summaryrefslogtreecommitdiffstats
path: root/sca-java-2.x/tags/2.0.1-RC1/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java
blob: 4c206ec5dbc85376fde1cc8b20ebd1ab86a4da06 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.    
 */
package org.apache.tuscany.sca.core.context.impl;

import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.tuscany.sca.assembly.Binding;
import org.apache.tuscany.sca.assembly.Endpoint;
import org.apache.tuscany.sca.assembly.EndpointReference;
import org.apache.tuscany.sca.assembly.SCABinding;
import org.apache.tuscany.sca.assembly.builder.BindingBuilder;
import org.apache.tuscany.sca.assembly.builder.BuilderContext;
import org.apache.tuscany.sca.context.CompositeContext;
import org.apache.tuscany.sca.context.ThreadMessageContext;
import org.apache.tuscany.sca.core.invocation.CallbackHandler;
import org.apache.tuscany.sca.core.invocation.Constants;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.oasisopen.sca.ServiceRuntimeException;

/**
 * Represent the reference to the callback service. This class basically wraps a Tuscany EndpointReference. 
 * The callback EPR is selected based on the are 3 basic scenarios we are trying to cater for
 * 
 * A/ <component name="MyComponent">
 *        <service name="MyService>
 *            blank OR <binding.sca/> 
 *        </service>
 *    </component>
 *    
 * B/ <component name="MyComponent">
 *        <service name="MyService>
 *            <binding.someremotebinding/> 
 *        </service>
 *    </component>
 *    
 * C/ <component name="MyComponent">
 *        <service name="MyService>
 *            some binding
 *            <callback>
 *                <binding.someremotebinding/> 
 *            </callback>
 *        </service>
 *    </component>    
 *    
 *  A - the callback binding will default to binding.sca and the expectation is that 
 *      the callback endpoint will be established by looking it up in the registry
 *      hence the forward call must contain the SCA target name referring to the 
 *      callback service
 *      
 *  B - the callback binding defaults to be the forward binding taking all of its
 *      configuration. The callback target URI is taken from the forward message and 
 *      put into the callback binding URI
 *      
 *  C - the callback binding is as specified by the user. If the user has not specified
 *      a binding URI then the URI from the forward message will be placed in the 
 *      callback binding URI. This may or may not lead to happiness depending on whether
 *      the forward and callback bindings are compatible
 *      
 *  The callback proxy, and this class, is instantiated whenever a new callback proxy is 
 *  required as follows:
 *  
 *    If the service component implementation is STATELESS then each incoming message  
 *    creates a new service instance and hence a new set of callback proxies
 *    
 *    If the service component implementation is COMPOSITE then only a single instance
 *    of the component implementation will exist and the callback proxy will be retrieved
 *    via the RequestContext. 
 *   
 *  Following the Tuscany runtime model for normal references we don't cache callback 
 *  proxies across component implementation instances. Hence there will be one
 *  instance of this class for each callback proxy, however created, and the class
 *  will refer to a single callback service. To put it another way, messages from 
 *  multiple clients (presenting different callback services) will be called back to
 *  via different callback proxies and hence a single instance of this class will 
 *  not be required to handle more than one callback address.  
 * 
 */
public class CallbackServiceReferenceImpl<B> extends ServiceReferenceImpl<B> {
    private static final Logger logger = Logger.getLogger(CallbackServiceReferenceImpl.class.getName());
    private RuntimeEndpointReference callbackEPR;
    private List<? extends EndpointReference> callbackEPRs;
    private Endpoint resolvedEndpoint;
    // Holds the ID of the Message that caused the creation of this CallbackServiceReference
    private String msgID;
    
    // Holds the URI of the target callback service from the Message that caused the 
    // creation of this CallbackServiceReference
    private CallbackHandler callbackHandler;

	/*
     * Public constructor for Externalizable serialization/deserialization
     * TODO - we need to serialize the msgID and callbackURI
     */
    public CallbackServiceReferenceImpl() {
        super();
    }

    public CallbackServiceReferenceImpl(Class<B> interfaze,
                                        List<? extends EndpointReference> callbackEPRs) {
        super(interfaze, null, getCompositeContext(callbackEPRs));
        this.callbackEPRs = callbackEPRs;
        
        Message msgContext = ThreadMessageContext.getMessageContext();
        
        // Capture the Message ID from the message which caused the creation of this 
        // CallBackServiceReference
        this.msgID = (String) msgContext.getHeaders().get(Constants.MESSAGE_ID);
        
        // Capture the callback URI from the message which caused the creation of this 
        // CallBackServiceReference. This code is more complex that needs be for the time being
        // to cater for bindings that still use the approach of constructing a callback endpoint
        // to model the callback URI. With these changes the binding can just set a CallbackHandler
        // in the forward message to get the same effect. Some bindings don't do that hence
        // the various checks
        this.resolvedEndpoint = null;
        
        if (msgContext.getFrom() != null){
            resolvedEndpoint = msgContext.getFrom().getCallbackEndpoint();
        }
        
        if (resolvedEndpoint != null){
            if (resolvedEndpoint.getBinding() == null){
                this.callbackHandler = new CallbackHandler(resolvedEndpoint.getURI());
            } else if (resolvedEndpoint.getBinding().getType().equals(SCABinding.TYPE)){
                this.callbackHandler = new CallbackHandler(resolvedEndpoint.getURI());
            } else {
                this.callbackHandler = new CallbackHandler(resolvedEndpoint.getBinding().getURI());
            }
        } else { 
            this.callbackHandler = (CallbackHandler)msgContext.getHeaders().get(Constants.CALLBACK);
            
            if (callbackHandler == null){
                this.callbackHandler = new CallbackHandler(null);
            }
        }
        
        if (callbackHandler.getCallbackTargetURI() != null){
            logger.log(Level.FINE, "Selecting callback EPR using address from forward message: " + callbackHandler.getCallbackTargetURI());
        } else {
            logger.log(Level.FINE, "Selecting callback EPR using address but callback URI is null");
        }
        
        // Work out which callback EPR to use
        callbackEPR = selectCallbackEPR(msgContext);
        if (callbackEPR == null) {
            throw new ServiceRuntimeException("No callback binding found for " + msgContext.getTo().toString());
        }
        
        // configure the callback EPR with the callback address
        if (callbackHandler.getCallbackTargetURI() != null) {
            callbackEPR = setCallbackAddress(callbackEPR);
        }
        
        this.resolvedEndpoint = callbackEPR.getTargetEndpoint();
    }
    
    public CallbackHandler getCallbackHandler() {
        return callbackHandler;
    }
    
    /** 
     * Gets the message ID associated with this callback reference. All calls through the proxy backed by 
     * this CallbackServiceReference will use the same msgID
     * 
     * @return the message ID
     */
    public String getMsgID() {
        return msgID;
    }
    
    
    private static CompositeContext getCompositeContext(List<? extends EndpointReference> callbackEPRs) {
        if(!callbackEPRs.isEmpty()) {
            RuntimeEndpointReference epr = (RuntimeEndpointReference) callbackEPRs.get(0);
            return epr.getCompositeContext();
        } 
        return null;
    }

    @Override
    protected B createProxy() throws Exception {
        return proxyFactory.createCallbackProxy(this);
    }

    public RuntimeEndpointReference getCallbackEPR() {
        return callbackEPR;    
    }

    public Endpoint getResolvedEndpoint() {
        return resolvedEndpoint;
    }

    private RuntimeEndpointReference selectCallbackEPR(Message msgContext) {
        
        // look for callback binding with same name as service binding
        Endpoint to = msgContext.getTo();
        if (to == null) {
            //FIXME: need better exception
            throw new ServiceRuntimeException("Destination for forward call is not available");
        }
        for (EndpointReference epr : callbackEPRs) {
            if (epr.getBinding().getName().equals(to.getBinding().getName())) {
                return (RuntimeEndpointReference) epr;
            }
        }

        // if no match, look for callback binding with same type as service binding
        for (EndpointReference epr : callbackEPRs) {
            if (epr.getBinding().getType().equals(to.getBinding().getType())) {
                return (RuntimeEndpointReference) epr;
            }
        }

        // no suitable callback wire was found
        return null;
    }

    private RuntimeEndpointReference setCallbackAddress(RuntimeEndpointReference endpointReference) {
        try {
            
            RuntimeEndpointReference epr = endpointReference;
            
            if (callbackHandler.getCloneCallbackWire()){
                epr = (RuntimeEndpointReference)endpointReference.clone();
            } 
            
            // TUSCANY-3932
            // If it's the default binding then we're going to look the callback endpoint
            // up in the registry. Most remote protocols, which may be used as delegates 
            // for binding.sca, will expect to deal with absolute URLs so flip the 
            // callback endpoint back to force the lookup to happen
            if (epr.getBinding().getType().equals(SCABinding.TYPE)){
                // A/ create a callback endpoint to allow the
                //    callback lookup to take place
                epr.setStatus(EndpointReference.Status.WIRED_TARGET_NOT_FOUND);
               
                // if an endpoint it provided in the forward message use it or
                // if not create one
                if (resolvedEndpoint == null ){
                    RuntimeEndpoint callbackEndpoint = (RuntimeEndpoint)assemblyFactory.createEndpoint();
                    callbackEndpoint.setURI(callbackHandler.getCallbackTargetURI());
                    callbackEndpoint.setUnresolved(true);
                    epr.setTargetEndpoint(callbackEndpoint);
                } else {
                    epr.setTargetEndpoint(resolvedEndpoint);
                }
            } else {
                // B/ and C/ assume that the callback EPR is already resolved
                //           and set the binding URI if one is provided with the
                //           forward message. Some bindings may want to do other
                //           things to determine the callback URI to the 
                //           CallbackHandler will be sent in the callback message
                //           header. This is particularly true if the clone isn't
                //           called above because resetting the URI will not 
                //           be thread safe.
                epr.setStatus(EndpointReference.Status.RESOLVED_BINDING);
                
                if ( callbackHandler.getCallbackTargetURI() != null ){
                    epr.getBinding().setURI(callbackHandler.getCallbackTargetURI());
                } else {
                    // do nothing and rely on whatever the user has configured 
                    // in the SCDL
                }
            }
        
            return epr;
        } catch (CloneNotSupportedException e) {
            // will not happen
            throw new ServiceRuntimeException(e);
        }
    } 
}