手写RPC--简易版

之前在网上看很多公司在面试的时候都会问到Dubbo等一些RPC框架,更有甚者直接要求面试者手写PRC,今天就来撩下RPC。

什么是RPC

RPC(Remote Procedure Call),远程过程调用,意思是可以在一台机器上调用远程的服务。在非分布式环境下,我们的程序调用服务都是本地调用,但是随着分布式结构的普遍,越来越多的应用需要解耦,将不同的独立功能部署发布成不同的服务供客户端调用,RPC就是为了解决这个问题的。RPC是一种规范,和TCP、UDP都没有关系,RPC可以采用TCP协议完成数据传输,甚至可以使用HTTP应用协议。RPC是C端模式,包含了服务端(服务提供方)、客户端(服务使用方),采用特定的网络传输协议,把数据按照特定的协议包装后进行传输操作等操作。

RPC原理

首先,我们心里带着这样的问题:要怎么样去调用远程的服务呢?

①肯定要知道IP和端口吧(确定唯一一个进程);

②肯定要知道调用什么服务吧(方法名和参数);

③调用服务后可能需要结果吧。

这三点又怎么实现呢?
RPC的设计由Client,Client stub,Network,Server stub,Server构成。
其中Client就是用来调用服务的,Cient stub是用来把调用的方法和参数序列化的(因为要在网络中传输,必须要把对象转变成字节),网络用来传输这些信息到服务器存根,服务器存根用来把这些信息反序列化的,服务器就是服务的提供者,最终调用的就是服务器提供的方法。
RPC的结构如下图:

img

图中1-10序号的含义如下:

  1. 客户端像调用本地服务似的调用远程服务;
  2. 客户端stub接收到调用后,将类名,方法名,参数列表序列化;
  3. 客户端通过插座将消息发送到服务端;
  4. Server stub收到消息后进行解码(将消息对象反序列化);
  5. 服务器存根根据解码结果调用本地的服务;
  6. 本地服务执行(对于服务端来说是本地执行)并将结果返回给服务器存根;
  7. 服务器存根将返回结果打包成消息(将结果消息对象序列化);
  8. 服务端通过插座将消息发送到客户端;
  9. 客户端stub接收到结果消息,并进行解码(将结果消息发序列化);
  10. 客户端得到最终结果。

这就是一个完成PRC调用过程,对使用方而言就只暴露了本地代理对象,剩下的数据解析、运输等都被包装了,从服务提供方的角度看还有服务暴露,如下是Dubbo的架构图:
img

简易RPC实现

项目目录

img

MethodParameter对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.springboot.whb.study.rpc.rpc_v1;

import com.alibaba.fastjson.JSON;
import lombok.Data;

import java.io.InputStream;
import java.io.ObjectInputStream;

/**
* @author: whb
* @description: 请求对象
*/
@Data
public class MethodParameter {

/**
* 类名
*/
private String className;

/**
* 方法名
*/
private String methodName;

/**
* 参数
*/
private Object[] arguments;

/**
* 参数类型
*/
private Class<?>[] parameterTypes;

@Override
public String toString() {
return JSON.toJSONString(this);
}

/**
* 从输入流中读取出类名、方法名、参数等数据组装成一个MethodParameter
*
* @param inputStream
* @return
*/
public static MethodParameter convert(InputStream inputStream) {
try {
ObjectInputStream input = new ObjectInputStream(inputStream);
String className = input.readUTF();
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
MethodParameter methodParameter = new MethodParameter();
methodParameter.setClassName(className);
methodParameter.setMethodName(methodName);
methodParameter.setArguments(arguments);
methodParameter.setParameterTypes(parameterTypes);
return methodParameter;
} catch (Exception e) {
throw new RuntimeException("解析请求错误:{}", e);
}
}
}

服务端-服务暴露

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.springboot.whb.study.rpc.rpc_v1;

import com.springboot.whb.study.rpc.rpc_v1.MethodParameter;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

/**
* @author: whb
* @description: 服务端暴露
* 服务暴露存储在objectMap对象中,所有可对提供的服务都必须添加到该容器中,以便于收到网络数据后能找到对应的服务,然后采用反射invoke调用,返回得到的结果。
*/
public class RpcExploreService {

private Map<String, Object> objectMap = new HashMap<>();

/**
* 可对外提供的服务
*
* @param className
* @param object
*/
public void explore(String className, Object object) {
objectMap.put(className, object);
}

/**
* 采用反射进行调用
*
* @param methodParameter
* @return
*/
public Object invoke(MethodParameter methodParameter) {
Object object = objectMap.get(methodParameter.getClassName());
if (object == null) {
throw new RuntimeException("无对应的执行类:【" + methodParameter.getClassName() + "】");
}
Method method = null;
try {
method = object.getClass().getMethod(methodParameter.getMethodName(), methodParameter.getParameterTypes());
} catch (NoSuchMethodException e) {
throw new RuntimeException("执行类:【" + methodParameter.getClassName() + "】无对应的执行方法:【" + methodParameter.getMethodName() + "】");
}
try {
Object result = method.invoke(object, methodParameter.getArguments());
System.out.println(methodParameter);
return result;
} catch (Exception e) {
throw new RuntimeException("invoke方法执行失败:" + e.getMessage());
}
}
}

服务端-网络数据处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.springboot.whb.study.rpc.rpc_v1;

import com.google.common.base.Joiner;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* @author: whb
* @description: 服务端-网络数据处理
* 简单的BIO模型,开启了一个ServerSocket后,接收到数据后就将套接字丢给一个新的线程处理,ServerSocketRunnable接收一个socket之后,
* 解析出MethodParameter请求对象,然后调用服务暴露的Invoke方法,再写回socket传输给客户端。
*/
public class IOService implements Runnable {

private int port;

private ServerSocket serverSocket;

private RpcExploreService rpcExploreService;

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join("thread-pool-", "%s")).build());

private volatile boolean flag;

public IOService(RpcExploreService rpcExploreService, int port) throws IOException {
this.rpcExploreService = rpcExploreService;
this.port = port;
this.serverSocket = new ServerSocket(port);
this.flag = true;
System.out.println("******服务端启动了********");

//优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
flag = false;
System.out.println("+++++服务端关闭了+++++");
}));
}

@Override
public void run() {
while (true) {
Socket socket = null;
try {
socket = serverSocket.accept();
} catch (IOException e) {

}
if (socket == null) {
continue;
}
threadPoolExecutor.execute(new ServerSocketRunnable(socket));
}
}

class ServerSocketRunnable implements Runnable {
private Socket socket;

public ServerSocketRunnable(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
try {
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
MethodParameter methodParameter = MethodParameter.convert(inputStream);
Object result = rpcExploreService.invoke(methodParameter);
ObjectOutputStream output = new ObjectOutputStream(outputStream);
output.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (socket != null) {
try {
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
}

客户端-服务订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.springboot.whb.study.rpc.rpc_v1;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;

/**
* @author: whb
* @description: 客户端-服务订阅
* 服务使用方需要使用register进行服务的注册,会生成对应的本地代理对象,后续只需要通过本地代理对象。
*/
public class RpcUsedService {

private Map<String, Object> proxyObejctMap = new HashMap<>();
private Map<String, Class> classMap = new HashMap<>();
private IOClient ioClient;

public void setIoClient(IOClient ioClient) {
this.ioClient = ioClient;
}

/**
* 服务注册
*
* @param clazz
*/
public void register(Class clazz) {
String className = clazz.getName();
classMap.put(className, clazz);
if (!clazz.isInterface()) {
throw new RuntimeException("暂时只支持接口类型的");
}
try {
RpcInvocationHandler handler = new RpcInvocationHandler();
handler.setClazz(clazz);
Object proxyInstance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, handler);
proxyObejctMap.put(className, proxyInstance);
} catch (Exception e) {
e.printStackTrace();
}
}

public <T> T get(Class<T> clazz) {
String className = clazz.getName();
return (T) proxyObejctMap.get(className);
}

class RpcInvocationHandler implements InvocationHandler {
private Class clazz;

public void setClazz(Class clazz) {
this.clazz = clazz;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
MethodParameter methodParameter = new MethodParameter();
methodParameter.setClassName(clazz.getName());
methodParameter.setMethodName(method.getName());
methodParameter.setArguments(args);
methodParameter.setParameterTypes(method.getParameterTypes());
return ioClient.invoke(methodParameter);
}
}
}

客户端-网络处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.springboot.whb.study.rpc.rpc_v1;

import java.io.*;
import java.net.Socket;

/**
* @author: whb
* @description: 客户端-网络处理
* 代理对象被调用后生成一个MethodParameter对象,通过此IOClient把数据传输到服务端,并且返回对应的数据。
*/
public class IOClient {

private String ip;
private int port;

public IOClient(String ip, int port) {
this.ip = ip;
this.port = port;
}

public Object invoke(MethodParameter methodParameter) {
Socket socket = null;
try {
socket = new Socket(ip, port);
OutputStream outputStream = socket.getOutputStream();
ObjectOutputStream output = new ObjectOutputStream(outputStream);
output.writeUTF(methodParameter.getClassName());
output.writeUTF(methodParameter.getMethodName());
output.writeObject(methodParameter.getParameterTypes());
output.writeObject(methodParameter.getArguments());
InputStream inputStream = socket.getInputStream();
ObjectInputStream input = new ObjectInputStream(inputStream);
return input.readObject();
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
throw new RuntimeException("socket关闭失败");
}
}
}
return null;
}
}

实践-服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.springboot.whb.study.rpc.rpc_v1;

import com.springboot.whb.study.rpc.rpc_v1.expore.HelloWorldImpl;

/**
* @author: whb
* @description: 服务端
*/
public class Service {

public static void main(String[] args) {
RpcExploreService rpcExploreService = new RpcExploreService();
//传入的字符串是接口的全名称
rpcExploreService.explore("com.springboot.whb.study.rpc.rpc_v1.expore.HelloWorld", new HelloWorldImpl());
try {
//开启11111端口监听服务
Runnable ioService = new IOService(rpcExploreService, 11111);
new Thread(ioService).start();
} catch (Exception e) {

}
}
}

实践-客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.springboot.whb.study.rpc.rpc_v1;

import com.google.common.base.Joiner;
import com.springboot.whb.study.rpc.rpc_v1.expore.HelloWorld;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* @author: whb
* @description: 客户端
*/
public class Client {
public static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000),
new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join("client-thread-pool-", "%s")).build());

public static void main(String[] args) {
RpcUsedService rpcUsedService = new RpcUsedService();
rpcUsedService.register(HelloWorld.class);
try {
IOClient ioClient = new IOClient("127.0.0.1", 11111);
//网络套接字连接 同上是10001端口
rpcUsedService.setIoClient(ioClient);
HelloWorld helloWorld = rpcUsedService.get(HelloWorld.class);
//生成的本地代理对象 proxy
for (int i = 0; i < 100; i++) {
threadPoolExecutor.execute(() -> {
long start = System.currentTimeMillis();
int a = new Random().nextInt(100);
int b = new Random().nextInt(100);
int c = helloWorld.add(a, b);
// .add 操作就是屏蔽了所有的细节,提供给客户端使用的方法
System.out.println("a: " + a + ", b:" + b + ", c=" + c + ", 耗时:" + (System.currentTimeMillis() - start));
});
}
} catch (Exception e) {
throw new RuntimeException("客户端执行出错:{}", e);
} finally {
threadPoolExecutor.shutdown();
}
}
}

服务接口

1
2
3
4
5
6
7
8
9
10
package com.springboot.whb.study.rpc.rpc_v1.expore;

/**
* @author: whb
* @description: 接口定义
*/
public interface HelloWorld {

int add(int a, int b);
}

服务接口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.springboot.whb.study.rpc.rpc_v1.expore;

import java.util.Random;

/**
* @author: whb
* @description: 接口实现类
*/
public class HelloWorldImpl implements HelloWorld {

@Override
public int add(int a, int b) {
long start = System.currentTimeMillis();
try {
Thread.sleep(new Random().nextInt(10000));
// 故意添加了耗时操作,以便于模拟真实的调用操作
} catch (InterruptedException e) {
e.printStackTrace();
}
int c = a + b;
System.out.println(Thread.currentThread().getName() + " 耗时:" + (System.currentTimeMillis() - start));
return c;
}
}

运行效果

img

img

总结

这只是一个非常简单的RPC实践,包含了服务暴露、服务注册(Proxy生成)、BIO模型进行网络传输,java默认的序列化方法,对RPC有一个初步的认识和了解,知道RPC必须包含的模块。

不过还是有很多需要优化的点以改进:

  1. IO模型:使用的是BIO模型,可以改进换成NIO模型,引入netty;
  2. 池化:不要随意新建线程,所有的线程都应有线程池统一管理;
  3. 服务发现:本地模拟的小demo,并没有服务发现,可以采用zk管理;
  4. 序列化:java本身自带的序列化效率很低,可以换成Hessian(DUBBO默认采用其作为序列化工具)、Protobuf(Protobuf是由Google提出的一种支持多语言的跨平台的序列化框架)等;
  5. 还有例如服务统计、优雅下线、负载均衡等也都是一个成熟的RPC框架必须要考虑到的点。

本文标题:手写RPC--简易版

文章作者:王洪博

发布时间:2019年04月25日 - 16:04

最后更新:2019年09月12日 - 10:09

原始链接:http://whb1990.github.io/posts/abff3a44.html

▄︻┻═┳一如果你喜欢这篇文章,请点击下方"打赏"按钮请我喝杯 ☕
0%