博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用t-io实现简单的rpc调用(一)
阅读量:7206 次
发布时间:2019-06-29

本文共 15094 字,大约阅读时间需要 50 分钟。

hot3.png

1.先从最基础的来 编写接口及实现类

public interface IUserService {	public String getList();	public String getList(Integer id,String name);}
import cn.ensoft.service.IUserService;public class UserServiceImpl implements IUserService{	@Override	public String getList() {		return "{'id':'abc','name':'hello'}";	}	@Override	public String getList(Integer id, String name) {		return "{'id':'"+id+"','name':'"+name+"'}";	}}

2.t-io自定义公共类

    2.1业务消息包

import java.io.UnsupportedEncodingException;import org.tio.core.intf.Packet;public class MsgPacket extends Packet{	    public static final String CHARSET = "GB18030";        private byte[] body;        public MsgPacket() {	}    public MsgPacket(String msg) {    	try {			this.body = msg.getBytes(MsgPacket.CHARSET);		} catch (UnsupportedEncodingException e) {		}	}    /**     * @return the body     */    public byte[] getBody(){        return body;    }    /**     * @param body the body to set     */    public void setBody(byte[] body){        this.body = body;    }}

    2.2编码&解码

import java.nio.ByteBuffer;import org.tio.core.ChannelContext;import org.tio.core.GroupContext;import org.tio.core.exception.AioDecodeException;import org.tio.core.intf.AioHandler;/** * 服务器端和客户端的编码解码算法是一样的,所以抽象一个公共的父类出来 * @author tanyaowu  */public abstract class MsgHandler implements AioHandler
{ /** * 编码:把业务消息包编码为可以发送的ByteBuffer * 总的消息结构:消息头 + 消息体 * 消息头结构: 4个字节,存储消息体的长度 * 消息体结构: 对象的json串的byte[] */ @Override public ByteBuffer encode(MsgPacket packet, GroupContext
groupContext, ChannelContext
channelContext){ byte[] body = packet.getBody(); int bodyLen = 0; if (body != null){ bodyLen = body.length; } //bytebuffer的总长度是 = 消息头的长度 + 消息体的长度 int allLen = bodyLen; //创建一个新的bytebuffer ByteBuffer buffer = ByteBuffer.allocate(allLen); //设置字节序 buffer.order(groupContext.getByteOrder()); //写入消息体 if (body != null){ buffer.put(body); } return buffer; } /** * 解码:把接收到的ByteBuffer,解码成应用可以识别的业务消息包 */ @Override public MsgPacket decode(ByteBuffer buffer, ChannelContext
channelContext) throws AioDecodeException{ int readableLength = buffer.limit() - buffer.position();//真实数据长度 if (readableLength == 0){ return null; } byte[] dst = new byte[readableLength]; MsgPacket imPacket = new MsgPacket(); buffer.get(dst, 0, readableLength); imPacket.setBody(dst); return imPacket; }}

    2.3服务端与客户端约定端口、IP及 客户端 连接超时时间

public class MsgConst{	public static final int PORT = 6789;	public static final String IP = "127.0.0.1";	public static final int TIME_OUT = 10000;}

    2.4内容 编码&解码 =>hex

import java.io.ByteArrayOutputStream;import java.io.UnsupportedEncodingException;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;/**  * 进制之间的转换  * @author jwzhangjie  *  */public class HexadecimalConver {private static String hexString="0123456789ABCDEF;{}\"'";  	/**  	 * 将字符串编码成16进制数字,适用于所有字符(包括中文) 	 * @throws UnsupportedEncodingException 	*/	public static String encode(String str) throws UnsupportedEncodingException{			//根据默认编码获取字节数组			byte[] bytes= str.getBytes(MsgPacket.CHARSET);			StringBuilder sb = new StringBuilder(bytes.length*2);			//将字节数组中每个字节拆解成2位16进制整数			for(int i=0;i
>4)); sb.append(hexString.charAt((bytes[i]&0x0f)>>0)); } return sb.toString(); } /** * 将16进制数字解码成字符串,适用于所有字符(包括中文) * @throws UnsupportedEncodingException */ public static String decode(String bytes) throws UnsupportedEncodingException{ ByteArrayOutputStream baos=new ByteArrayOutputStream(bytes.length()/2); //将每2位16进制整数组装成一个字节 for(int i=0;i
[] argsType,Object[] args){ JSONObject jsonObject = new JSONObject(); jsonObject.put("inf", inf); jsonObject.put("method", method); jsonObject.put("argsType", argsType); jsonObject.put("args", args); return JSON.toJSONString(jsonObject); } public static String genWriteCode(String json){ try { return HexadecimalConver.encode(json); } catch (UnsupportedEncodingException e) { return null; } }}

3.编写tio-server用于客户端远程调用

    3.1 服务端入口

import java.io.IOException;import java.util.HashMap;import java.util.Map;import org.tio.server.AioServer;import org.tio.server.ServerGroupContext;import org.tio.server.intf.ServerAioHandler;import org.tio.server.intf.ServerAioListener;public class MsgServerStarter{		public static Map
> regist; //handler, 包括编码、解码、消息处理 public static ServerAioHandler
aioHandler = new MsgServerAioHandler(); //事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口 public static ServerAioListener
aioListener = null; //一组连接共用的上下文对象 public static ServerGroupContext
serverGroupContext = new ServerGroupContext<>(aioHandler,aioListener); //aioServer对象 public static AioServer
aioServer = new AioServer<>(serverGroupContext); //有时候需要绑定ip,不需要则null public static String serverIp = MsgConst.IP; //监听的端口 public static int serverPort = MsgConst.PORT; /** * 启动程序入口 * @throws IOException * @throws IllegalAccessException * @throws InstantiationException */ public static void main(String[] args) throws IOException { regist = new HashMap
>(); regist.put(IUserService.class.getName(),UserServiceImpl.class); aioServer.start(serverIp, serverPort); }}

    3.2消息处理器

import java.io.UnsupportedEncodingException;import org.nutz.log.Log;import org.nutz.log.Logs;import org.tio.core.Aio;import org.tio.core.ChannelContext;import org.tio.server.intf.ServerAioHandler;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;public class MsgServerAioHandler extends MsgHandler implements ServerAioHandler
{ private static final Log log = Logs.get(); /** * 处理消息 * @throws UnsupportedEncodingException */ @Override public Object handler(MsgPacket packet, ChannelContext
channelContext) throws UnsupportedEncodingException { MsgPacket resppacket = new MsgPacket(); byte[] body = packet.getBody(); if (body != null) { String encryCode = new String(body, MsgPacket.CHARSET); log.debugf("服务端收到消息:%s", encryCode); try { String requestJson = HexadecimalConver.decode(encryCode); log.debugf("解析数据:%s", requestJson); JSONObject requestObj = JSON.parseObject(requestJson); String inf = requestObj.getString("inf"); String method = requestObj.getString("method"); JSONArray argsTypeB = requestObj.getJSONArray("argsType"); Class
[] argsType = new Class[argsTypeB.size()]; for (int i = 0; i < argsTypeB.size(); i++) { String classzz = argsTypeB.getString(i); argsType[i] = Class.forName(classzz); } JSONArray argsB = requestObj.getJSONArray("args"); Object[] args = new Object[argsB.size()]; for (int i = 0; i < argsB.size(); i++) { Object atp = argsB.get(i); args[i] = atp; } Class
bindClazz = MsgServerStarter.regist.get(inf); Object bindObj = bindClazz.newInstance(); String responseMsg = (String)bindClazz.getMethod(method,argsType).invoke(bindObj,args); String responseCode = HexadecimalConver.genWriteCode(responseMsg); resppacket.setBody(responseCode.getBytes(MsgPacket.CHARSET)); } catch (Exception e) { log.error("该消息无法解析",e); log.errorf("msg", e); resppacket.setBody(FAIL.getBytes(MsgPacket.CHARSET)); } Aio.send(channelContext, resppacket); } return null; }}

4.编写tio-client客户端

    4.1客户端入口

import java.util.HashMap;import java.util.Map;import org.tio.client.AioClient;import org.tio.client.ClientChannelContext;import org.tio.client.ClientGroupContext;import org.tio.client.ReconnConf;import org.tio.client.intf.ClientAioHandler;import org.tio.client.intf.ClientAioListener;import org.tio.core.Node;public class MsgClientStarter{	public static Map
result; //服务器节点 public static Node serverNode = new Node(MsgConst.IP,MsgConst.PORT); //handler, 包括编码、解码、消息处理 public static ClientAioHandler
aioClientHandler = new MsgClientAioHandler(); //事件监听器,可以为null,但建议自己实现该接口,可以参考showcase了解些接口 public static ClientAioListener
aioListener = null; //new MsgClientAioListener(); //断链后自动连接的,不想自动连接请设为null private static ReconnConf
reconnConf = null; //private static ReconnConf
reconnConf = new ReconnConf
(5000L); //一组连接共用的上下文对象 public static ClientGroupContext
clientGroupContext = new ClientGroupContext<>(aioClientHandler, aioListener, reconnConf); public static AioClient
aioClient = null; public static ClientChannelContext
clientChannelContext = null; /** * 启动程序入口 * @throws Exception */ public static void instance() throws Exception { if(aioClient==null){ aioClient = new AioClient
(clientGroupContext); clientChannelContext = aioClient.connect(serverNode); clientChannelContext.setUserid(java.util.UUID.randomUUID().toString().toUpperCase()); result = new HashMap
(); } } }

    4.2消息处理器

import org.tio.client.intf.ClientAioHandler;import org.tio.core.ChannelContext;import cn.ensoft.tio.MsgHandler;import cn.ensoft.tio.MsgPacket;import cn.ensoft.util.HexadecimalConver;public class MsgClientAioHandler extends MsgHandler implements ClientAioHandler
{ /** * 处理消息 */ @Override public Object handler(MsgPacket packet, ChannelContext
channelContext) throws Exception { byte[] body = packet.getBody(); if (body != null) { String str = new String(body, MsgPacket.CHARSET); System.out.println("CLIENT=>>收到消息:" + str); System.out.println("CLIENT=>>收到消息:" + HexadecimalConver.decode(str)); MsgClientStarter.result.put(channelContext.getUserid(), HexadecimalConver.decode(str)); } return null; } @Override public MsgPacket heartbeatPacket() { return null; }}

5.代理

import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import org.nutz.log.Log;import org.nutz.log.Logs;import org.tio.core.Aio;import cn.ensoft.client.MsgClientStarter;import cn.ensoft.tio.MsgPacket;import cn.ensoft.tio.MsgConst;public class ClientProxy
implements InvocationHandler{ private static final Log log = Logs.get(); private Object targetClazz; public ClientProxy(final Class
clazz) throws InstantiationException, IllegalAccessException { this.targetClazz = clazz.newInstance(); } public Object bind() { return Proxy.newProxyInstance(targetClazz.getClass().getClassLoader(),targetClazz.getClass().getInterfaces(), this); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { MsgClientStarter.instance(); String impl = targetClazz.getClass().getCanonicalName(); String inf = targetClazz.getClass().getInterfaces()[0].getName(); String methodName = method.getName(); Class
[] argsType = method.getParameterTypes(); log.debugf("参数-IMPL=%s", impl); log.debugf("参数-INF=%s", inf); log.debugf("参数-method=%s", methodName); log.debugf("参数-argsType=%s",JSON.toJSONString(argsType)); log.debugf("参数-args=%s", JSON.toJSONString(args)); String requestJson = HexadecimalConver.genWriteJson(inf, methodName,argsType, args); String requestCode = HexadecimalConver.genWriteCode(requestJson); log.debugf("requestJson=%s", requestJson); log.debugf("requestCode=%s", requestCode); log.debugf("requestCodeD=%s", HexadecimalConver.decode(requestCode)); MsgPacket packet = new MsgPacket(); packet.setBody(requestCode.getBytes(MsgPacket.CHARSET)); Aio.send(MsgClientStarter.clientChannelContext, packet); long stimes = System.currentTimeMillis(); String result = null; while(true){ result = MsgClientStarter.result.get(MsgClientStarter.clientChannelContext.getUserid()); if(null != result){ break; }else{ if((System.currentTimeMillis()-stimes)>=MsgConst.TIME_OUT){ break; } } } Aio.remove(MsgClientStarter.clientChannelContext, "sucess"); return result; }}

6.测试

public class Test {	public static void main(String[] args)  {		try {			IUserService userService = (IUserService) new ClientProxy
(UserServiceImpl.class).bind(); String msg = userService.getList(888,"hello-rpc-"+java.util.UUID.randomUUID()); System.out.println("============"+msg); System.exit(0); } catch (Exception e) { e.printStackTrace(); } }}

先启动服务端

10:28:26,196 WARN AioServer:90 - t-io server started, listen on 127.0.0.1:6789

然后运行Test

10:34:18,356 INFO AioClient:338 - [1]: curr:0, closed:0, received:(0p)(0b), handled:0, sent:(0p)(0b)10:34:18,356 INFO ConnectionCompletionHandler:98 - connected to 127.0.0.1:678910:34:18,356 DEBUG ClientProxy:39 - 参数-IMPL=cn.ensoft.service.impl.UserServiceImpl10:34:18,356 DEBUG ClientProxy:40 - 参数-INF=cn.ensoft.service.IUserService10:34:18,356 DEBUG ClientProxy:41 - 参数-method=getList10:34:18,387 DEBUG ClientProxy:42 - 参数-argsType=["java.lang.Integer","java.lang.String"]10:34:18,387 DEBUG ClientProxy:43 - 参数-args=[888,"hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6"]10:34:18,402 DEBUG ClientProxy:46 - requestJson={"inf":"cn.ensoft.service.IUserService","args":[888,"hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6"],"method":"getList","argsType":["java.lang.Integer","java.lang.String"]}10:34:18,402 DEBUG ClientProxy:47 - requestCode=7B22696E66223A22636E2E656E736F66742E736572766963652E495573657253657276696365222C2261726773223A5B3838382C2268656C6C6F2D7270632D65376639393930312D336639632D343163632D386239342D353963663664306433316536225D2C226D6574686F64223A226765744C697374222C226172677354797065223A5B226A6176612E6C616E672E496E7465676572222C226A6176612E6C616E672E537472696E67225D7D10:34:18,402 DEBUG ClientProxy:48 - requestCodeD={"inf":"cn.ensoft.service.IUserService","args":[888,"hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6"],"method":"getList","argsType":["java.lang.Integer","java.lang.String"]}CLIENT=>>收到消息:7B276964273A27383838272C276E616D65273A2768656C6C6F2D7270632D65376639393930312D336639632D343163632D386239342D353963663664306433316536277DCLIENT=>>收到消息:{'id':'888','name':'hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6'}10:34:18,408 INFO DecodeRunnable:151 - 0.0.0.0:57205 收到消息 10:34:18,408 DEBUG DecodeRunnable:168 - 0.0.0.0:57205,组包后,数据刚好用完============{'id':'888','name':'hello-rpc-e7f99901-3f9c-41cc-8b94-59cf6d0d31e6'}10:34:18,408 INFO CloseRunnable:120 - 准备关闭连接:0.0.0.0:57205, isNeedRemove:true, sucess10:34:18,408 INFO Aio:78 - 0.0.0.0:57205 正在等待被关闭

 

写得不好,请赐教,勿喷 谢谢!

转载于:https://my.oschina.net/longtutengfei/blog/892053

你可能感兴趣的文章
概率图论模型(probabilistic graphical model)的由来—— 它的本质是什么
查看>>
day02-字符串、字典
查看>>
使用Oracle SQL Developer报错:Unable to find a Java Virtual Machine
查看>>
ArcFace 2.0 人脸识别Android 版demo
查看>>
Iframe传值
查看>>
Activity生命周期
查看>>
如何理解人工智能、机器学习和深度学习三者的关系
查看>>
Css3小技术
查看>>
[备份]Emacs配置文件
查看>>
Android页面切换方法和区别
查看>>
go语言的错误处理
查看>>
The type org.apache.commons.pool.impl.GenericObjectPool$Config cannot be resolved. It is indirectly
查看>>
数论+DP HDOJ 4345 Permutation
查看>>
BZOJ4337:[BJOI2015]树的同构——题解
查看>>
底层文件访问之一:write系统调用
查看>>
asp.net 事务回滚
查看>>
[CF]Round510
查看>>
VS2010 C#调用C++ DLL文件
查看>>
程序的测试
查看>>
Python进阶【第一篇】:Python简介
查看>>