/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.tuscany.sca.binding.erlang.impl; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Map; import org.apache.tuscany.sca.binding.erlang.ErlangBinding; import org.apache.tuscany.sca.binding.erlang.impl.types.TypeHelpersProxy; import org.apache.tuscany.sca.interfacedef.DataType; import org.apache.tuscany.sca.interfacedef.Operation; import org.apache.tuscany.sca.runtime.RuntimeComponentService; import com.ericsson.otp.erlang.OtpConnection; import com.ericsson.otp.erlang.OtpErlangAtom; import com.ericsson.otp.erlang.OtpErlangList; import com.ericsson.otp.erlang.OtpErlangObject; import com.ericsson.otp.erlang.OtpErlangPid; import com.ericsson.otp.erlang.OtpErlangRef; import com.ericsson.otp.erlang.OtpErlangString; import com.ericsson.otp.erlang.OtpErlangTuple; public class RpcExecutor implements Runnable { private Map services; private Map bindings; private OtpConnection connection; private static final OtpErlangAtom OK = new OtpErlangAtom("ok"); private static final OtpErlangAtom ERROR = new OtpErlangAtom("error"); private static final OtpErlangAtom BADRPC = new OtpErlangAtom("badrpc"); public RpcExecutor(Map services, Map bindings, OtpConnection connection) { this.bindings = bindings; this.services = services; this.connection = connection; } private void sendMessage(OtpConnection connection, OtpErlangPid pid, OtpErlangRef ref, OtpErlangAtom head, OtpErlangObject message) throws IOException { OtpErlangTuple tResult = new OtpErlangTuple(new OtpErlangObject[] { head, message }); OtpErlangObject msg = null; msg = new OtpErlangTuple(new OtpErlangObject[] { ref, tResult }); connection.send(pid, msg); } public void run() { OtpErlangTuple request = null; OtpErlangPid senderPid = null; OtpErlangRef senderRef = null; try { OtpErlangTuple call = (OtpErlangTuple) connection.receive(); OtpErlangTuple from = (OtpErlangTuple) call.elementAt(1); request = (OtpErlangTuple) call.elementAt(2); senderPid = (OtpErlangPid) from.elementAt(0); senderRef = (OtpErlangRef) from.elementAt(1); String module = ((OtpErlangAtom) request.elementAt(1)).atomValue(); String function = ((OtpErlangAtom) request.elementAt(2)) .atomValue(); OtpErlangObject args = request.elementAt(3); OtpErlangList argsList = null; if (args instanceof OtpErlangList) { argsList = (OtpErlangList) args; } else { argsList = new OtpErlangList(args); } if (!services.containsKey(module)) { // TODO: externalize message? OtpErlangObject errorMsg = MessageHelper.functionUndefMessage( module, function, argsList, "Module not found in SCA component."); sendMessage(connection, senderPid, senderRef, BADRPC, errorMsg); } else { RuntimeComponentService service = services.get(module); ErlangBinding binding = bindings.get(module); List operations = service.getInterfaceContract() .getInterface().getOperations(); Operation operation = null; for (Operation o : operations) { if (o.getName().equals(function)) { operation = o; break; } } if (operation != null) { List iTypes = operation.getInputType() .getLogical(); Class[] forClasses = new Class[iTypes.size()]; for (int i = 0; i < iTypes.size(); i++) { forClasses[i] = iTypes.get(i).getPhysical(); } try { Object result = service.getRuntimeWire(binding).invoke( operation, TypeHelpersProxy.toJavaFromList(argsList, forClasses)); OtpErlangObject response = null; if (operation.getOutputType() != null && operation.getOutputType().getPhysical() .isArray()) { response = TypeHelpersProxy.toErlangAsList(result); } else if (operation.getOutputType() == null) { Object[] arrArg = new Object[] {}; response = TypeHelpersProxy.toErlang(arrArg); } else { Object[] arrArg = new Object[] { result }; response = TypeHelpersProxy.toErlang(arrArg); } sendMessage(connection, senderPid, senderRef, OK, response); } catch (Exception e) { if ((e.getClass().equals( InvocationTargetException.class) && e .getCause().getClass().equals( IllegalArgumentException.class)) || e.getClass().equals( TypeMismatchException.class)) { // TODO: externalize message? OtpErlangObject errorMsg = MessageHelper .functionUndefMessage(module, function, argsList, "Operation name found in SCA component, but parameters types didn't match."); sendMessage(connection, senderPid, senderRef, BADRPC, errorMsg); } else { throw e; } } } else { // TODO: externalize message? OtpErlangObject errorMsg = MessageHelper .functionUndefMessage(module, function, argsList, "Operation name not found in SCA component."); sendMessage(connection, senderPid, senderRef, BADRPC, errorMsg); } } } catch (Exception e) { // TODO: distinguish and describe errors! try { e.printStackTrace(); sendMessage(connection, senderPid, senderRef, ERROR, new OtpErlangString( "Unhandled error while processing request: " + e.getClass().getCanonicalName() + ", message: " + e.getMessage())); } catch (IOException e1) { // error while sending error message. Can't do anything now } } finally { connection.close(); } } }