/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.tuscany.sca.binding.erlang.impl; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; import org.apache.tuscany.sca.binding.erlang.ErlangBinding; import org.apache.tuscany.sca.binding.erlang.impl.types.TypeHelpersProxy; import org.apache.tuscany.sca.interfacedef.DataType; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.runtime.RuntimeComponentService; import com.ericsson.otp.erlang.OtpConnection; import com.ericsson.otp.erlang.OtpErlangAtom; import com.ericsson.otp.erlang.OtpErlangList; import com.ericsson.otp.erlang.OtpErlangObject; import com.ericsson.otp.erlang.OtpErlangPid; import com.ericsson.otp.erlang.OtpErlangRef; import com.ericsson.otp.erlang.OtpErlangString; import com.ericsson.otp.erlang.OtpErlangTuple; import com.ericsson.otp.erlang.OtpMbox; import com.ericsson.otp.erlang.OtpMsg; import com.ericsson.otp.erlang.OtpNode; /** * @version $Rev$ $Date$ */ public class ServiceExecutor implements Runnable { private static final long RECEIVE_TIMEOUT = 60000; private Map erlangModules; private ErlangNodeElement erlangMbox; private OtpConnection connection; private Map> groupedOperations; public ServiceExecutor(OtpConnection connection, Map> groupedOperations, Map erlangModules, ErlangNodeElement erlangMbox) { this.erlangModules = erlangModules; this.connection = connection; this.groupedOperations = groupedOperations; this.erlangMbox = erlangMbox; } private void sendMessage(OtpConnection connection, OtpErlangPid pid, OtpErlangRef ref, OtpErlangAtom head, OtpErlangObject message) throws IOException { OtpErlangTuple tResult = new OtpErlangTuple(new OtpErlangObject[] { head, message }); OtpErlangObject msg = null; msg = new OtpErlangTuple(new OtpErlangObject[] { ref, tResult }); connection.send(pid, msg); } private void handleRpc(OtpMsg msg) { OtpErlangTuple request = null; OtpErlangPid senderPid = null; OtpErlangRef senderRef = null; try { OtpErlangTuple call = (OtpErlangTuple) msg.getMsg(); OtpErlangTuple from = (OtpErlangTuple) call.elementAt(1); request = (OtpErlangTuple) call.elementAt(2); senderPid = (OtpErlangPid) from.elementAt(0); senderRef = (OtpErlangRef) from.elementAt(1); String module = ((OtpErlangAtom) request.elementAt(1)).atomValue(); String function = ((OtpErlangAtom) request.elementAt(2)) .atomValue(); OtpErlangObject args = request.elementAt(3); OtpErlangList argsList = null; // normalize input if (args instanceof OtpErlangList) { argsList = (OtpErlangList) args; } else { argsList = new OtpErlangList(args); } if (!erlangModules.containsKey(module)) { // TODO: externalize message? OtpErlangObject errorMsg = MessageHelper.functionUndefMessage( module, function, argsList, "Module not found in SCA component."); sendMessage(connection, senderPid, senderRef, MessageHelper.ATOM_BADRPC, errorMsg); } else { RuntimeComponentService service = erlangModules.get(module) .getService(); ErlangBinding binding = erlangModules.get(module).getBinding(); List operations = service.getInterfaceContract() .getInterface().getOperations(); Operation operation = null; for (Operation o : operations) { if (o.getName().equals(function)) { operation = o; break; } } if (operation != null) { List iTypes = operation.getInputType() .getLogical(); Class[] forClasses = new Class[iTypes.size()]; for (int i = 0; i < iTypes.size(); i++) { forClasses[i] = iTypes.get(i).getPhysical(); } try { Object result = service.getRuntimeWire(binding).invoke( operation, TypeHelpersProxy.toJavaFromList(argsList, forClasses)); OtpErlangObject response = null; if (operation.getOutputType() != null && operation.getOutputType().getPhysical() .isArray()) { response = TypeHelpersProxy.toErlangAsList(result); } else if (operation.getOutputType() == null) { Object[] arrArg = new Object[] {}; response = TypeHelpersProxy.toErlang(arrArg); } else { Object[] arrArg = new Object[] { result }; response = TypeHelpersProxy.toErlang(arrArg); } sendMessage(connection, senderPid, senderRef, MessageHelper.ATOM_OK, response); } catch (Exception e) { if ((e.getClass().equals( InvocationTargetException.class) && e .getCause().getClass().equals( IllegalArgumentException.class)) || e.getClass().equals( TypeMismatchException.class)) { // TODO: externalize message? OtpErlangObject errorMsg = MessageHelper .functionUndefMessage(module, function, argsList, "Operation name found in SCA component, but parameters types didn't match."); sendMessage(connection, senderPid, senderRef, MessageHelper.ATOM_BADRPC, errorMsg); } else { throw e; } } } else { // TODO: externalize message? OtpErlangObject errorMsg = MessageHelper .functionUndefMessage(module, function, argsList, "Operation name not found in SCA component."); sendMessage(connection, senderPid, senderRef, MessageHelper.ATOM_BADRPC, errorMsg); } } } catch (Exception e) { // TODO: distinguish and describe errors! try { e.printStackTrace(); sendMessage(connection, senderPid, senderRef, MessageHelper.ATOM_ERROR, new OtpErlangString( "Unhandled error while processing request: " + e.getClass().getCanonicalName() + ", message: " + e.getMessage())); } catch (IOException e1) { // error while sending error message. Can't do anything now } } } private void handleMsg(OtpMsg msg) { Operation matchedOperation = null; Object args[] = null; List operations = groupedOperations.get(msg .getRecipientName()); if (operations == null) { // TODO: no such mbox, send error message? } else { for (Operation operation : operations) { List iTypes = operation.getInputType().getLogical(); Class[] forClasses = new Class[iTypes.size()]; for (int i = 0; i < iTypes.size(); i++) { forClasses[i] = iTypes.get(i).getPhysical(); } try { args = TypeHelpersProxy.toJavaAsArgs(msg.getMsg(), forClasses); matchedOperation = operation; break; } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (matchedOperation != null) { try { Object result = erlangMbox.getService().getRuntimeWire( erlangMbox.getBinding()).invoke(matchedOperation, args); OtpErlangObject response = null; if (matchedOperation.getOutputType() != null && matchedOperation.getOutputType().getPhysical() .isArray()) { response = TypeHelpersProxy.toErlangAsList(result); } else if (matchedOperation.getOutputType() != null) { Object[] arrArg = new Object[] { result }; response = TypeHelpersProxy.toErlang(arrArg); } if (response != null) { OtpNode node = new OtpNode("_response_connector_to_" + msg.getSenderPid()); OtpMbox mbox = node.createMbox(); mbox.send(msg.getSenderPid(), response); } } catch (InvocationTargetException e) { // TODO send some error? e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { // TODO: send some error - no mapping for such arguments System.out .println("TODO: send some error - no mapping for such arguments"); } } } public void run() { try { OtpMsg msg = connection.receiveMsg(RECEIVE_TIMEOUT); if (msg.getRecipientName().equals(MessageHelper.RPC_MBOX)) { handleRpc(msg); } else if (msg != null) { handleMsg(msg); } else { // message receive timeout } } catch (Exception e) { // TODO: log, send error? e.printStackTrace(); } finally { connection.close(); } } }