赵玉伟的博客

用动态代理实现RPC调用

RPC,即 Remote Procedure Call 三个单词的第一个字母组合,翻译为:远程过程调用。 在对象中,“处理过程”由方法包装,所以,RPC调用也即远程方法调用。通俗的讲,进程可以像调用本地(本地指同一个进程)方法一样去调用远程方法。
一张简图:

如何实现

条件A:依赖接口。

对于服务提供者,需要定义一批接口, 这些接口即提供的服务。 具体做哪些事由实现了这些接口的类完成。
对于服务消费者,需要持有服务提供者定义的接口,然后,调用某个接口的某个方法,完成一次RPC调用。
对于消费者,因为仅仅持有接口,而接口是不能被实例化的,那么,接口的方法是怎么调用的呢?答案是通过动态代理,利用动态代理,在本地构建一个实现了接口的类。

条件B:依赖网络传输。

既然需要跨服务, 就必须需要网络,可以基于TCP层实现, 也可以基于Http层实现网络传输。

条件C:依赖数据的序列化与反序列化。

数据如果被传输,那么只能是二进制序列,而我们在进程中使用的是对象,所以,对象需要能够被序列化。
利用上述三个条件,便可以写一个最基本的PRC调用程序。

具体代码实现最基本的功能

下面写一个demo:
首先准备一个接口和一个实现类

接口:

1
2
3
4
package com.rpc.server;
public interface Animal {
public String say(String name);
}

实现类:

1
2
3
4
5
6
7
8
package com.rpc.server;
public class Dog implements Animal{
@Override
public String say(String name) {
String voice = name + " : 汪.. 汪汪...";
return voice;
}
}

写一个服务提供者,轮询端口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package com.rpc.server;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
public class Server {
// 启动后,监听的端口号
public static final int PORT = 12345;
// 服务启动后, 用来存储提供的接口
private Map<String, Object> scan = new HashMap<String, Object>();
private ServerSocket serverSocket;
// 注册服务
public void register(){
Animal animal = new Dog();
scan.put(Animal.class.getName(), animal);
}
// 请求进入后的入口
public void accept(){
try {
serverSocket = new ServerSocket(PORT);
while (true) {
// 阻塞方法,等待请求的到来
Socket socket = serverSocket.accept();
// 处理本次请求的线程
new TaskThread(socket);
}
} catch (Exception e) {
System.out.println("服务器异常: " + e);
}
}
/**
* 处理具体认为的线程,也叫work线程
*/
private class TaskThread implements Runnable {
private Socket socket;
public TaskThread(Socket client) {
socket = client;
new Thread(this).start();
}
@Override
public void run() {
try {
// 读取客户端的数据
ObjectInputStream input = new ObjectInputStream (socket.getInputStream());
// 读取客户端传入的类
Class<?> clazz = (Class<?>)input.readObject();
// 读取客户端写入的方法
String methodName = input.readUTF();
// 读取客户端写入的方法参数
Class<?>[] parameterTypes = (Class<?>[])input.readObject();
//读取调用方法所需要的参数
Object[] arguments = (Object[])input.readObject();
// 通过反射获取调用的方法对象
Method method = clazz.getMethod(methodName, parameterTypes);
// 根据传入的类对象,获取真实的对象
Object obj = scan.get(clazz.getName());
// 在获取的对象上调用传入的方法
Object result = method.invoke(obj, arguments);
// 将方法执行的结果写入流
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
out.writeObject(result);
out.close();
input.close();
} catch (Exception e) {
System.out.println("处理任务异常: " + e);
} finally {
// 执行清理工作
}
}
}
public static void main(String[] args) {
Server server = new Server();
server.register();
server.accept();
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.rpc.client;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
import com.rpc.server.Animal;
public class ClientStub {
// IP
private String host;
// 端口
private int port;
public ClientStub(String host, int port){
this.host = host;
this.port = port;
}
// 根据接口类型,获取一个代理类
@SuppressWarnings("unchecked")
public <T> T getInstance(Class<T> interfaceClass){
if(interfaceClass == null){
throw new IllegalArgumentException("interface is null...");
}
if(!interfaceClass.isInterface()){
throw new IllegalArgumentException("param's type is not interface");
}
// 获取代理类
T instance = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler(){
// 需要执行的方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 连接远程服务
Socket socket = new Socket(host, port);
Object result;
try {
// 获取socket的输出流
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
// 将类对象写入流中
output.writeObject(interfaceClass);
// 将调用方法的名称写入流中
output.writeUTF(method.getName());
// 将调用方法的参数类型写入流中
output.writeObject(method.getParameterTypes());
// 将方法的参数写入流中
output.writeObject(args);
// 发射,从流中获取数据
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
result = input.readObject();
} finally {
input.close();
}
} finally {
socket.close();
}
return result;
}
});
return instance;
}
public static void main(String[] args) {
ClientStub client = new ClientStub("127.0.0.1", 12345);
Animal animal = client.getInstance(Animal.class);
System.out.println(animal.say("小狗"));
}
}

以上,我们便利用动态代理、socket通信、java的ObjectInputStream、ObjectOutputStream做了一个最简单的PRC应用。但这仅仅是个例子,只拿来做演示用。

扩充

根据以上的例子,我们明白了一个RPC调用应该如何实现。那么,更进一步,我们如何写一个RPC框架?
1、完善通信协议
数据传输依赖于网络协议,而网络具有不可靠性,所以,通信协议需要做一系列的“弥补措施”, 来保证又快又稳定的传输数据。这方面大部分公司使用netty。netty基于NIO,而且又解决了很多TCP中比如粘包拆包的问题。
2、改善序列化算法,提高序列化的效率。
一个对象如果被网络传输, 需要将其序列化成二进制流。也就是将对象按照某种规则有序的排列起来。序列化至少要考虑两个方面,1、序列化算法的性能; 2、二进制的大小。 这两方面受所要序列化对象大小的影响,所以,合适是最好的。
3、启用多线程,提高服务的tps
一旦请求上量之后,必须要考虑请求的RT。 用机器现有的资源,最大化的处理数量。做好这一点,应该先看一下java原生的线程池 ThreadPoolExecutor的实现。
4、服务的注册与发现
服务的注册, 也就是服务启动后,将正常服务的快照保存一份,同时, 将这份快照推给消费者。 这一步利用zookeeper可以完成。

以上四点我认为是最基本的,用在工程上,其他的监控、管理等等也是必不可少的组件。