同步转异步+RPC的一个POS行业应用-业务模型介绍
- - 行业应用 - ITeye博客最近在做一个挺有意思的POS消费项目,工作量不太大,但涉及的技术运用还挺有意思的. 可能有人奇怪,POS项目怎么用到JAVA语言了,先来简单介绍下这个项目背景:. 改造前:收银机下单,POS机下单并刷卡支付. 改造后:收银机跟POS连线,收银台直接下单并触发POS刷卡支付动作. 这里就涉及一个关键问题,POS机只能单线程工作,就是一个时刻只能干一件事情,比如打印,刷卡,跟卡主机通讯,都必须是一件件做.
简单回顾下业务模型:收银台<=>POS代理服务器<=>POS机,三者之间进行通讯,POS代理作为一个报文转换和同步转异步角色。
下面介绍下几个关键技术实现:
1、报文
这里的报文,指的是POS代理服务器跟POS通讯之间约定的报文。根据POS交易类型,支付、刷卡、打印等,约定每个交易报文包含什么字段信息和长度,其中一个比较特别字段是UUID,这个字段是每个报文的关键字段,
具有唯一性,每个报文都不同,主要用来实现同步转异步中,POS返回数据给代理服务器后找回原来发送指令的channel,并最终把转换后的数据发送给收银台。
之所以要找到原来的channel,是因为同步转异步的过程中,channel是被临时保存起来的。
2、同步转异步关键代码
public class PosResponseFuture<I> { private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private String uuid;//消息序列号 //psoresponse使用 private final static Map<String, PosResponseFuture> futures = new ConcurrentHashMap<String, PosResponseFuture>(); private final static Object synLock = new Object(); public I write2pos(boolean broadcastFlag,MsgRequest msg) throws PosConnException,TimeOutException,TryLaterException { synchronized(synLock) { long st = System.currentTimeMillis(); lock.lock(); try { this.uuid = msg.getId(); futures.put(this.uuid, this);//把当前调用环境保存起来 //向pos发送消息 log.debug("向POS发送消息:{}",msg); PosIntContext.write2pos(msg); int timeout = PosIntContext.getApiTimeout(); if (msg.getTimeout()!=-1) { timeout = msg.getTimeout(); log.debug("超时设置:{}",timeout); } //这里是同步转异步关键 //程序执行到这里,一直处于阻塞状态,直到POS返回 //这里还设置了一个超时时间,避免POS出现故障,导致调用一直在等待 done.await(timeout,TimeUnit.SECONDS); if (!isDone()) { throw new TimeOutException("超时("+timeout+"秒)"); } } catch (InterruptedException e) { log.error("write2pos InterruptedException: "+e.getMessage()); throw new PosConnException(e); } catch (TimeOutException e) { throw e; } catch (PosConnException e) { throw e; } catch (TryLaterException e) { throw e; } finally { this.release(); lock.unlock(); } long en = System.currentTimeMillis(); log.debug("{} 执行时间:{}",msg.toString(),(en-st)); //POS执行完成,正常返回 if (response instanceof MsgResponse) { return (I)response; } return null; } } /** * pos返回消息回调 * @Title: received * @Description: TODO * @param @param response * @return void * @throws */ public static void received(MsgResponse response) { //用主键取回调用环境 PosResponseFuture<?> future = futures.remove(response.getId()); if (future != null) { future.doReceived(response); } } /** * 检测返回值 * @Title: isDone * @Description: TODO * @param @return * @return boolean * @throws */ private boolean isDone() { return this.response != null;//null代表超时 } /** * 接受到返回 * @Title: doReceived * @Description: TODO * @param @param response * @return void * @throws */ private void doReceived(MsgResponse response) { lock.lock();//同步控制,线程安全 try { this.response = response; done.signal();//notify,通知线程往下执行 } finally { lock.unlock(); } } /** * 释放资源 * @Title: release * @Description: TODO * @param * @return void * @throws */ private void release() { PosResponseFuture<I> tmp = futures.remove(this.uuid); if (tmp!=null) { log.debug("释放资源:{}",new Object[]{this.uuid,tmp.getProcessMsg()}); } else { log.debug("释放资源:NULL!"); } } public static void main(String args[]) { } }
3、POS代理服务器暴露RPC调用接口关键代码
public class Client { //这个代码包含了rpc调用的核心 @SuppressWarnings("unchecked") public <T> T getProxy(Class<T> interfaceClass,final String host,final int port) { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { //其实就是一个AOP拦截 public Object invoke(Object proxy, Method method,Object[] arguments) throws Throwable { Socket socket = null; ObjectOutputStream output = null; ObjectInputStream input = null; try { //把需要调用的类、方法和参数,序列化传输到RPC服务器 //等待远端调用完成返回结果 socket = new Socket(host, port); output = new ObjectOutputStream(socket.getOutputStream()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(arguments); input = new ObjectInputStream(socket.getInputStream()); return input.readObject(); } catch(Exception e) { throw e; } finally { if (socket!=null) { socket.close(); } if (output!=null) { output.close(); } if (input!=null) { input.close(); } } } }); } public static void main(String args[]) { HelloService helloService = new Client().getProxy(HelloService.class,"localhost",8080); long st = System.currentTimeMillis(); for (int i=0; i<1; i++) { System.out.println(i+"> "+helloService.sayHello("哈哈")); } long en = System.currentTimeMillis(); System.out.println("耗时:"+(en-st)); } } public class Server { private int port = 8888; public void rpcServer() throws Exception { ServerSocket server = null; try { server = new ServerSocket(port); for(;;) { final Socket socket = server.accept(); System.out.println(socket.getRemoteSocketAddress()); new Thread(new Runnable() { @Override public void run() { ObjectOutputStream output = null; ObjectInputStream input = null; try { input = new ObjectInputStream(socket.getInputStream());//接受rpc client请求 String methodName = input.readUTF();//调用方法名 Class<?>[] parameterTypes = (Class<?>[])input.readObject(); Object[] arguments = (Object[])input.readObject();//调用参数 output = new ObjectOutputStream(socket.getOutputStream()); Method method = new HelloServiceImp().getClass().getMethod(methodName, parameterTypes); Object result = method.invoke(new HelloServiceImp(), arguments);//执行调用 output.writeObject(result);//回写结果 } catch(Exception e) { e.printStackTrace(); } finally { try { if (output!=null) { output.close(); } if (input!=null) { input.close(); } } catch(Exception e) { } } } }).start(); } } catch(Exception e) { throw e; } finally { if (server!=null) { server.close(); } } } public static void main(String args[]) throws Exception { new Server().rpcServer(); } } public interface HelloService { public String sayHello(String input); } public class HelloServiceImp implements HelloService { @Override public String sayHello(String input) { return input + " wellcome."; } }