同步转异步+RPC的一个POS行业应用-关键技术实现

标签: 同步 异步 rpc | 发表时间:2015-01-18 07:12 | 作者:leobasic
分享到:
出处:http://www.iteye.com



简单回顾下业务模型:收银台<=>POS代理服务器<=>POS机,三者之间进行通讯,POS代理作为一个报文转换和同步转异步角色。

下面介绍下几个关键技术实现:

1、报文

这里的报文,指的是POS代理服务器跟POS通讯之间约定的报文。根据POS交易类型,支付、刷卡、打印等,约定每个交易报文包含什么字段信息和长度,其中一个比较特别字段是UUID,这个字段是每个报文的关键字段,
具有唯一性,每个报文都不同,主要用来实现同步转异步中,POS返回数据给代理服务器后找回原来发送指令的channel,并最终把转换后的数据发送给收银台。

之所以要找到原来的channel,是因为同步转异步的过程中,channel是被临时保存起来的。


2、同步转异步关键代码

public class PosResponseFuture<I> {


private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();

private String uuid;//消息序列号

//psoresponse使用
private final static Map<String, PosResponseFuture> futures = new ConcurrentHashMap<String, PosResponseFuture>();
private final static Object synLock = new Object();

public I write2pos(boolean broadcastFlag,MsgRequest msg) throws PosConnException,TimeOutException,TryLaterException {


synchronized(synLock)
{
long st = System.currentTimeMillis();

lock.lock();
try {
this.uuid = msg.getId();
futures.put(this.uuid, this);//把当前调用环境保存起来

//向pos发送消息
log.debug("向POS发送消息:{}",msg);

PosIntContext.write2pos(msg);

int timeout = PosIntContext.getApiTimeout();
if (msg.getTimeout()!=-1)
{
timeout = msg.getTimeout();
log.debug("超时设置:{}",timeout);
}
//这里是同步转异步关键
//程序执行到这里,一直处于阻塞状态,直到POS返回
//这里还设置了一个超时时间,避免POS出现故障,导致调用一直在等待
done.await(timeout,TimeUnit.SECONDS);
if (!isDone())
{
throw new TimeOutException("超时("+timeout+"秒)");
}

} catch (InterruptedException e) {
log.error("write2pos InterruptedException: "+e.getMessage());
throw new PosConnException(e);
} catch (TimeOutException e) {
throw e;
} catch (PosConnException e) {
throw e;
} catch (TryLaterException e) {
throw e;
}
finally {
this.release();
lock.unlock();
}

long en = System.currentTimeMillis();

log.debug("{} 执行时间:{}",msg.toString(),(en-st));
//POS执行完成,正常返回
if (response instanceof MsgResponse)
{
return (I)response;
}

return null;
}

}

/**
* pos返回消息回调
* @Title: received
* @Description: TODO
* @param @param response
* @return void
* @throws
*/
public static void received(MsgResponse response) {
//用主键取回调用环境
PosResponseFuture<?> future = futures.remove(response.getId());

if (future != null) {
future.doReceived(response);
}

}

/**
* 检测返回值
* @Title: isDone
* @Description: TODO
* @param @return
* @return boolean
* @throws
*/
private boolean isDone() {
return this.response != null;//null代表超时
}

/**
* 接受到返回
* @Title: doReceived
* @Description: TODO
* @param @param response
* @return void
* @throws
*/
private void doReceived(MsgResponse response) {
lock.lock();//同步控制,线程安全
try {
this.response = response;
done.signal();//notify,通知线程往下执行
} finally {
lock.unlock();
}
}

/**
* 释放资源
* @Title: release
* @Description: TODO
* @param
* @return void
* @throws
*/
private void release()
{
PosResponseFuture<I> tmp = futures.remove(this.uuid);
if (tmp!=null)
{

log.debug("释放资源:{}",new Object[]{this.uuid,tmp.getProcessMsg()});
}
else
{
log.debug("释放资源:NULL!");
}
}

public static void main(String args[])
{
}

}

 



3、POS代理服务器暴露RPC调用接口关键代码

public class Client {
	
	//这个代码包含了rpc调用的核心
	@SuppressWarnings("unchecked")
	public <T> T getProxy(Class<T> interfaceClass,final  String host,final  int port) {
		return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
				new Class<?>[] { interfaceClass }, new InvocationHandler() {
					//其实就是一个AOP拦截
					public Object invoke(Object proxy, Method method,Object[] arguments) throws Throwable {
						
						Socket socket = null;
						ObjectOutputStream output = null;
						ObjectInputStream input = null;
						
						try
						{
							//把需要调用的类、方法和参数,序列化传输到RPC服务器
							//等待远端调用完成返回结果
							socket = new Socket(host, port);
							output = new ObjectOutputStream(socket.getOutputStream());
							output.writeUTF(method.getName());
							output.writeObject(method.getParameterTypes());
							output.writeObject(arguments);
							
							input = new ObjectInputStream(socket.getInputStream());
							
							return input.readObject();
						}
						catch(Exception e)
						{
							throw e;
						}
						finally
						{
							if (socket!=null)
							{
								socket.close();
							}
							if (output!=null)
							{
								output.close();
							}
							if (input!=null)
							{
								input.close();
							}
						}
						
					}
					
				});
	}


	public static void main(String args[])
	{
		HelloService helloService = new Client().getProxy(HelloService.class,"localhost",8080);
		
		long st = System.currentTimeMillis();
		for (int i=0; i<1; i++)
		{
			System.out.println(i+"> "+helloService.sayHello("哈哈"));
		}
		long en = System.currentTimeMillis();
		System.out.println("耗时:"+(en-st));
	}

}



public class Server {

	private int port = 8888;
	
	public void rpcServer()
		throws Exception
	{
		ServerSocket server = null;
		
		try
		{
			server = new ServerSocket(port);
			
			for(;;) 
			{
				final Socket socket = server.accept();
				System.out.println(socket.getRemoteSocketAddress());
				new Thread(new Runnable() {

					@Override
					public void run() {

						ObjectOutputStream output = null;
						ObjectInputStream input = null;

						try
						{
							input = new ObjectInputStream(socket.getInputStream());//接受rpc client请求
							String methodName = input.readUTF();//调用方法名
							Class<?>[] parameterTypes = (Class<?>[])input.readObject();
							Object[] arguments = (Object[])input.readObject();//调用参数

							output = new ObjectOutputStream(socket.getOutputStream());
							Method method =  new HelloServiceImp().getClass().getMethod(methodName, parameterTypes);
							Object result = method.invoke(new HelloServiceImp(), arguments);//执行调用
							output.writeObject(result);//回写结果
						}
						catch(Exception e)
						{
							e.printStackTrace();
						}
						finally
						{
							try
							{
								if (output!=null)
								{
									output.close();
								}
								if (input!=null)
								{
									input.close();
								}
							}
							catch(Exception e)
							{
							}
						}
						
					}
				
				
				}).start();
			}
		}
		catch(Exception e)
		{
			throw e;
		}
		finally
		{
			if (server!=null)
			{
				server.close();
			}
		}
		
	
	}

	
	public static void main(String args[]) throws Exception
	{
		new Server().rpcServer();
	}

	
	
}


public interface HelloService {
	public String sayHello(String input);
}

public class HelloServiceImp implements HelloService {

	@Override
	public String sayHello(String input) {
		return input + " wellcome.";
	}

}

 



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [同步 异步 rpc] 推荐:

同步转异步+RPC的一个POS行业应用-业务模型介绍

- - 行业应用 - ITeye博客
最近在做一个挺有意思的POS消费项目,工作量不太大,但涉及的技术运用还挺有意思的. 可能有人奇怪,POS项目怎么用到JAVA语言了,先来简单介绍下这个项目背景:. 改造前:收银机下单,POS机下单并刷卡支付. 改造后:收银机跟POS连线,收银台直接下单并触发POS刷卡支付动作. 这里就涉及一个关键问题,POS机只能单线程工作,就是一个时刻只能干一件事情,比如打印,刷卡,跟卡主机通讯,都必须是一件件做.

同步转异步+RPC的一个POS行业应用-关键技术实现

- - 行业应用 - ITeye博客
简单回顾下业务模型:收银台<=>POS代理服务器<=>POS机,三者之间进行通讯,POS代理作为一个报文转换和同步转异步角色. 下面介绍下几个关键技术实现:. 这里的报文,指的是POS代理服务器跟POS通讯之间约定的报文. 根据POS交易类型,支付、刷卡、打印等,约定每个交易报文包含什么字段信息和长度,其中一个比较特别字段是UUID,这个字段是每个报文的关键字段,.

基于Netty的异步Rpc调用的小框架

- - Java - 编程语言 - ITeye博客
基于netty写的一个异步Rpc调用小框架,欢迎拍砖,新手. private String methodName;//调用的方法名称. private Class<?>[] types;//参数类型. private Object[] objects;//参数列表.  框架类,有两个静态方法,regist(在服务器上注册服务)和getobjt(获得接口的代理类).

Hadoop RPC机制

- - 企业架构 - ITeye博客
RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议. Hadoop底层的交互都是通过 rpc进行的. 例如:datanode和namenode 、tasktracker和jobtracker、secondary namenode和namenode之间的通信都是通过rpc实现的.

JAVA RPC 通讯框架

- - 经验沉淀 知识结晶
Bison 是一个JAVA 程间的通信框架,基于apache mina 实现,对mina进行了byteBuffer 缓冲区重用以及半包出处时减少拷贝. 客户端(bison-client) 功能点. 2 支持高用性:高可用的一个基本原则,可以接受快速的失败,但不能接受长时间的等待. Githup地址:https://github.com/gavenpeng/Bison.

RPC原理详解 - 永志

- - 博客园_首页
RPC 的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性.  为实现该目标,RPC 框架需提供一种透明调用机制让使用者不必显式的区分本地调用和远程调用. 下面我们将具体细化 stub 结构的实现. 客户方等待调用执行完成并返回结果. 客户方调用后不用等待执行结果返回,但依然可以通过回调通知等方式获取返回结果.

Avro RPC 对比测试

- - 行业应用 - ITeye博客
J2EE平台常采用多层分布式的架构体系. 分布式服务节点之间需要通讯和交互(业务节点和资源节点之间),服务端和客户端需要交互(终端客户端需要调用服务端的远程服务,客户端有C实现的,也有Java等其他语言实现的). 因此基础平台需要提供一个稳定、高效的、可伸缩的RPC服务性组件. 稳定,高性能;作为一个基础性的骨架组件,高可用性和高性能是必备的;传输层希望是面向连接的TCP通信.

zmq-rpc:基于zeromq网络层编写的protobuf RPC框架

- Shengbin - codedump
阅读过zmq的代码之后,感觉这个网络层是我目前见过最高效的–线程之间使用lockfree的消息队列保存消息,可以启动多个I/O线程分担压力等等特性.于是决定基于它写一个protobuf RPC的框架.. 另外,这里使用的protobuf是旧版本2.3.0,新版本2.4.1的生成的RPC service接口跟原来不太一致,暂时还没有去研究它.BTW,升级版本之后导致原来的接口发生变化这是一个很操蛋的事情..

【RPC框架HttpInvoker一】HttpInvoker:Spring自带RPC框架

- - 开源软件 - ITeye博客
HttpInvoker是Spring原生的RPC调用框架,HttpInvoker同Burlap和Hessian一样,提供了一致的服务Exporter以及客户端的服务代理工厂Bean,这篇文章主要是复制粘贴了Hessian与Spring集成一文,. 【RPC框架Hessian四】Hessian与Spring集成.

从同步到异步,从匿名到实名

- keso - 乱象,印迹
题记:完成正则表达式的书稿,对许多事情不再麻木,慢慢恢复写点东西的意识. 虽然对网络没太多钻研,但从97年至今,自己上网的历史也有十多年了. 在我眼里,这些年来网络呈现出两个趋势:从同步到异步,从匿名到实名,所以写了这篇散记. 先说从同步到异步(这里借用了计算机专业中的两个术语,有必要为不熟悉的读者做一点解释:异步是一种松散的通讯模式,一方发送消息之后,不需要等待对方回复,即可以接续处理,电子邮件就是最常见的异步通讯模式;同步则是较为紧密的通讯模式,一方发送消息之后,需要等到对方回复,才可以接续处理).