0%

手写RPC框架

手写RPC框架

简易版本RPC开发

基本概念

什么是RPC

RPC:Remote Procedure Call即远程过程调用,是一种计算机通信协议,允许程序在不同计算机之间进行通信和交互,就像本地调用一样。

为什么需要RPC

  • 透明性:RPC 提供了一种机制,使得远程服务调用看起来就像本地方法调用一样。开发者不需要关心底层的网络通信细节,可以专注于业务逻辑。
  • 封装性:RPC 框架通常会处理序列化、反序列化、网络传输等复杂操作,将这些细节从应用程序中抽象出来。
  • 模块化:通过将服务拆分为多个独立的服务,并使用 RPC 进行通信,可以实现更好的模块化。每个服务都可以独立开发、部署和扩展。
  • 解耦合:服务之间的依赖关系可以通过接口定义来管理,而不是直接的代码依赖。这使得系统的各个部分更加松散耦合,更容易进行修改和升级。

RPC框架实现思路

基本设计

假设有消费者和服务提供者两个角色

image-20241015192036350

消费者想要调用提供者,就需要提供者启动一个web服务,然后通过请求客户端发送HTTP请求或者其他协议来调用。

若有多个服务和方法,每个接口都要单独写一个接口,则较为麻烦。

则提供一个统一的服务调用接口,通过请求处理器根据客户端的请求参数来进行不同的处理、调用不同的服务和方法。

可以在服务提供者维护一个本地服务注册器,记录服务和对应实现类的映射。

image-20241015194556974

另外,需要注意的是,java对象在网络中无法直接传输,所以需要对传输的参数进行序列化和反序列化。

所以,一个简易的RPC框架就此诞生。

image-20241015194719132

开发简易版RPC框架

简易版实现几个模块:

  • example-common:示例代码的公共依赖,包括接口、model等
  • example-consumer:示例服务消费者代码
  • example-provide:示例服务t提供者代码
  • lkj-rpc-easy:简易版rpc框架

公共模块

  • 编写实体类User
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.lkj.example.common.model;

import java.io.Serializable;


public class User implements Serializable {

private String name;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

对象需要实习序列化接口,为后续网络传输序列化提供支持

  • 编写用户服务接口UserService,提供一个获取用户的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.lkj.example.common.service;

import com.lkj.example.common.model.User;


public interface UserService {

/**
* 获取用户
*
* @param user
* @return
*/
User getUser(User user);


}

服务提供者

  • 编写服务实现类,实现公共模块中定义的用户服务接口

功能是打印用户的名称,并且返回参数中的用户对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.lkj.example.provider;

import com.lkj.example.common.model.User;
import com.lkj.example.common.service.UserService;


public class UserServiceImpl implements UserService {

public User getUser(User user) {
System.out.println("用户名:" + user.getName());
return user;
}
}

  • 编写服务提供者启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.lkj.example.provider;

import com.lkj.example.common.service.UserService;
import com.lkj.yurpc.registry.LocalRegistry;
import com.lkj.yurpc.server.HttpServer;
import com.lkj.yurpc.server.VertxHttpServer;


public class EasyProviderExample {

public static void main(String[] args) {
// 注册服务
LocalRegistry.register(UserService.class.getName(), UserServiceImpl.class);

// 启动 web 服务
HttpServer httpServer = new VertxHttpServer();
httpServer.doStart(8080);
}
}

服务消费者

  • 创建服务消费者启动类,编写调用接口的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 package com.lkj.example.consumer;

import com.lkj.example.common.model.User;
import com.lkj.example.common.service.UserService;
import com.lkj.yurpc.proxy.ServiceProxyFactory;


public class EasyConsumerExample {

public static void main(String[] args) {

UserService userService = null;
User user = new User();
user.setName("lkj");
// 调用
User newUser = userService.getUser(user);
if (newUser != null) {
System.out.println(newUser.getName());
} else {
System.out.println("user == null");
}
}
}

这里我们还没有获取userService实例,所以定义为null

web服务器

要让服务提供者提供可远程访问的服务,那么就需要一个web服务器,接受处理并返回请求。

此处使用高性能的NIO框架Vert.x作为RPC框架的web服务器。

  • 编写一个web服务器的接口HttpServer,定义统一的启动服务器方法
1
2
3
4
5
6
7
8
9
10
11
12
13
package com.lkj.yurpc.server;


public interface HttpServer {

/**
* 启动服务器
*
* @param port
*/
void doStart(int port);
}

  • 编写基于Vert.x的web服务器VertxHttpServer,能够箭筒指定端口并处理请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.lkj.yurpc.server;

import io.vertx.core.Vertx;


public class VertxHttpServer implements HttpServer {

/**
* 启动服务器
*
* @param port
*/
public void doStart(int port) {
// 创建 Vert.x 实例
Vertx vertx = Vertx.vertx();

// 创建 HTTP 服务器
io.vertx.core.http.HttpServer server = vertx.createHttpServer();

// 监听端口并处理请求
server.requestHandler(new HttpServerHandler());

// 启动 HTTP 服务器并监听指定端口
server.listen(port, result -> {
if (result.succeeded()) {
System.out.println("Server is now listening on port " + port);
} else {
System.err.println("Failed to start server: " + result.cause());
}
});
}
}

本地服务注册器

在简易版本中,暂时不用第三方注册中心,直接把服务注册到服务提供者本地。

  • 使用线程安全的ConcurrentHashMap存储服务注册信息,key为服务名称,value为服务的实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.lkj.yurpc.registry;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class LocalRegistry {

/**
* 注册信息存储
*/
private static final Map<String, Class<?>> map = new ConcurrentHashMap<>();

/**
* 注册服务
*
* @param serviceName
* @param implClass
*/
public static void register(String serviceName, Class<?> implClass) {
map.put(serviceName, implClass);
}

/**
* 获取服务
*
* @param serviceName
* @return
*/
public static Class<?> get(String serviceName) {
return map.get(serviceName);
}

/**
* 删除服务
*
* @param serviceName
*/
public static void remove(String serviceName) {
map.remove(serviceName);
}
}

序列化器

  • 序列化:将java对象转为可传输的字节数组
  • 反序列化:将字节数组转换为可传输的java对象

为了方便此处选择java原生的序列化器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.lkj.yurpc.serializer;

import java.io.*;


public class JdkSerializer implements Serializer {

/**
* 序列化
*
* @param object
* @param <T>
* @return
* @throws IOException
*/
@Override
public <T> byte[] serialize(T object) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(object);
objectOutputStream.close();
return outputStream.toByteArray();
}

/**
* 反序列化
*
* @param bytes
* @param type
* @param <T>
* @return
* @throws IOException
*/
@Override
public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
try {
return (T) objectInputStream.readObject();
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} finally {
objectInputStream.close();
}
}
}

提供者调用-请求处理器

  • RpcRequest的作用是封装调用所需的信息,比如服务名称,方法名称,调用参数的类型列表,参数列表。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.lkj.yurpc.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;


@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest implements Serializable {

/**
* 服务名称
*/
private String serviceName;

/**
* 方法名称
*/
private String methodName;

/**
* 参数类型列表
*/
private Class<?>[] parameterTypes;

/**
* 参数列表
*/
private Object[] args;

}
  • 响应类RpcResponse的作用是封装调用方法得到的返回值、以及调用的信息等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.lkj.yurpc.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;


@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RpcResponse implements Serializable {

/**
* 响应数据
*/
private Object data;

/**
* 响应数据类型(预留)
*/
private Class<?> dataType;

/**
* 响应信息
*/
private String message;

/**
* 异常信息
*/
private Exception exception;

}
  • 编写请求处理器 HttpServerHandler

业务流程如下:

1.反序列化请求为对象,并从请求对象中获取参数

2.根据服务名称从本地注册器中获取到对应的服务实现类

3.通过反射机制调用方法,得到返回结果

4.对返回结果进行封装和序列化,并写入响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.lkj.yurpc.server;

import com.lkj.yurpc.model.RpcRequest;
import com.lkj.yurpc.model.RpcResponse;
import com.lkj.yurpc.registry.LocalRegistry;
import com.lkj.yurpc.serializer.JdkSerializer;
import com.lkj.yurpc.serializer.Serializer;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;

import java.io.IOException;
import java.lang.reflect.Method;


public class HttpServerHandler implements Handler<HttpServerRequest> {

@Override
public void handle(HttpServerRequest request) {
// 指定序列化器
final Serializer serializer = new JdkSerializer();

// 记录日志
System.out.println("Received request: " + request.method() + " " + request.uri());

// 异步处理 HTTP 请求
request.bodyHandler(body -> {
byte[] bytes = body.getBytes();
RpcRequest rpcRequest = null;
try {
rpcRequest = serializer.deserialize(bytes, RpcRequest.class);
} catch (Exception e) {
e.printStackTrace();
}

// 构造响应结果对象
RpcResponse rpcResponse = new RpcResponse();
// 如果请求为 null,直接返回
if (rpcRequest == null) {
rpcResponse.setMessage("rpcRequest is null");
doResponse(request, rpcResponse, serializer);
return;
}

try {
// 获取要调用的服务实现类,通过反射调用
Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
// 封装返回结果
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType());
rpcResponse.setMessage("ok");
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setMessage(e.getMessage());
rpcResponse.setException(e);
}
// 响应
doResponse(request, rpcResponse, serializer);
});
}

/**
* 响应
*
* @param request
* @param rpcResponse
* @param serializer
*/
void doResponse(HttpServerRequest request, RpcResponse rpcResponse, Serializer serializer) {
HttpServerResponse httpServerResponse = request.response()
.putHeader("content-type", "application/json");
try {
// 序列化
byte[] serialized = serializer.serialize(rpcResponse);
httpServerResponse.end(Buffer.buffer(serialized));
} catch (IOException e) {
e.printStackTrace();
httpServerResponse.end(Buffer.buffer());
}
}
}

消费方发起调用-代理

代理的实现方式大致分为2类:静态代理和动态代理

静态代理

静态代理是指为每一个特定类型的接口或对象,编写一个代理类。

静态代理虽然很好理解,但缺点也很明显,如果给每个服务接口都写一个实现类,是非常麻烦的,代理方式灵活性很差。

所以RPC框架中,会使用动态代理。

动态代理

动态代理的作用是,根据要生成的对象类型,自动生成一个代理对象。

常用的动态代理实现方式有JDK动态代理和基于字节码生成的动态代理。前者简单易用、无需引入额外的库,但缺点是只能对接口进行代理;后者更灵活、可以对任何类进行代理,但性能略低于JDK动态代理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.lkj.yurpc.proxy;

import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.lkj.yurpc.model.RpcRequest;
import com.lkj.yurpc.model.RpcResponse;
import com.lkj.yurpc.serializer.JdkSerializer;
import com.lkj.yurpc.serializer.Serializer;

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;


public class ServiceProxy implements InvocationHandler {

/**
* 调用代理
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 指定序列化器
Serializer serializer = new JdkSerializer();

// 构造请求
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();
try {
// 序列化
byte[] bodyBytes = serializer.serialize(rpcRequest);
// 发送请求
// todo 注意,这里地址被硬编码了(需要使用注册中心和服务发现机制解决)
try (HttpResponse httpResponse = HttpRequest.post("http://localhost:8080")
.body(bodyBytes)
.execute()) {
byte[] result = httpResponse.bodyBytes();
// 反序列化
RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
return rpcResponse.getData();
}
} catch (IOException e) {
e.printStackTrace();
}

return null;
}
}

当用户调用某个接口的方法时,会改为调用invoke方法。在invoke方法中,我们可以获取到要调用的方法信息,传入的参数列表等。

需要注意的时,上述代码中,请求的服务提供者地址被硬编码了,需要使用注册中心和服务发现机制来解决。

  • 创建动态代理工厂ServiceProxyFactory,作用是根据指定类创建动态代理对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.lkj.yurpc.proxy;

import java.lang.reflect.Proxy;


public class ServiceProxyFactory {

/**
* 根据服务类获取代理对象
*
* @param serviceClass
* @param <T>
* @return
*/
public static <T> T getProxy(Class<T> serviceClass) {
return (T) Proxy.newProxyInstance(
serviceClass.getClassLoader(),
new Class[]{serviceClass},
new ServiceProxy());
}
}

简易测试

image-20241015222428567

image-20241015222646726

image-20241015222659479

静态代理的调试过程

  • 直接跳到20行,开始调用getuser

image-20241016142105817

  • 进入UserServiceProxy的getUser方法

image-20241016142226322

  • 执行到http请求,进入拦截器部分,即第35行

image-20241016142257749

  • 开始执行拦截器,进行反射调用返回结果,中间进行序列化反序列化等操作

image-20241016142426358

  • 返回代理中,得到结果

image-20241016143828389

动态代理调试过程

  • InvocationHandler 接口ServiceProxy 类实现了 InvocationHandler 接口,这是 Java 动态代理的核心接口。

  • invoke 方法invoke 方法是 InvocationHandler 接口中的唯一方法。这个方法会在每次通过代理对象调用方法时被调用。

    invoke方法接收三个参数:

    • proxy:代理对象本身。
    • method:被调用的方法。
    • args:方法的参数。

动态代理的工作流程

  1. 构造请求invoke 方法首先构建一个 RpcRequest 对象,包含服务名、方法名、参数类型和参数值。
  2. 序列化请求:将 RpcRequest 对象序列化为字节数组。
  3. 发送请求:使用 HTTP 请求库(如 Hutool)将序列化的请求发送到指定的服务器地址(这里是硬编码的 http://localhost:8080)。
  4. 接收响应:从服务器接收响应,并将其反序列化为 RpcResponse 对象。
  5. 返回结果:从 RpcResponse 中提取数据并返回给调用者。

与静态代理的区别

静态代理

  • 手动创建代理类:静态代理需要为每个接口手动创建一个代理类,代理类实现了相同的接口。
  • 编译时确定:静态代理在编译时就已经确定,代理类的代码是固定的。
  • 灵活性差:如果接口发生变化,需要手动修改代理类,维护成本高。

动态代理

  • 自动生成代理类:动态代理在运行时自动生成代理类,不需要手动编写代理类。
  • 运行时确定:动态代理在运行时根据实际调用的方法动态生成代理类。
  • 灵活性高:动态代理可以灵活地添加或修改拦截逻辑,适用于多种场景,特别是当接口经常变化时。

由于动态代理中,invoke方法会在每次通过代理对象调用方法时被调用。拦截器调用次数多,调试较为繁琐,所以不做图片展示。

全局配置加载

需求分析

在RPC框架的运行中,会涉及到很多的配置信息,比如注册中心的地址,序列化方式,网络服务器端口号等等。

在简易版本的RPC项目中,我们采用了硬编码的方式,不利于维护。

同时PRC框架是需要被其他项目作为提供者或消费者引入的,所以我们应当允许引入框架的项目编写配置文件来自定义配置。

因此,需要配置一套全局配置加载功能。

项目实现

配置加载

  • 新建RpcConfig,用于保存配置信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.lkj.yurpc.config;


import lombok.Data;


@Data
public class RpcConfig {

/**
* 名称
*/
private String name = "yu-rpc";

/**
* 版本号
*/
private String version = "1.0";

/**
* 服务器主机名
*/
private String serverHost = "localhost";

/**
* 服务器端口号
*/
private Integer serverPort = 8080;
}
  • 新建工具类ConfigUtils,作用是读配置文件并返回配置对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.lkj.yurpc.utils;

import cn.hutool.core.util.StrUtil;
import cn.hutool.setting.dialect.Props;


public class ConfigUtils {

/**
* 加载配置对象
*
* @param tClass
* @param prefix
* @param <T>
* @return
*/
public static <T> T loadConfig(Class<T> tClass, String prefix) {
return loadConfig(tClass, prefix, "");
}

/**
* 加载配置对象,支持区分环境
*
* @param tClass
* @param prefix
* @param environment
* @param <T>
* @return
*/
public static <T> T loadConfig(Class<T> tClass, String prefix, String environment) {
StringBuilder configFileBuilder = new StringBuilder("application");
if (StrUtil.isNotBlank(environment)) {
configFileBuilder.append("-").append(environment);
}
configFileBuilder.append(".properties");
Props props = new Props(configFileBuilder.toString());
return props.toBean(tClass, prefix);
}
}

  • 新建RpcConstant接口,用于存储RPC框架相关常量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.lkj.yurpc.constant;

/**
* RPC 相关常量

*/
public interface RpcConstant {

/**
* 默认配置文件加载前缀
*/
String DEFAULT_CONFIG_PREFIX = "rpc";

/**
* 默认服务版本
*/
String DEFAULT_SERVICE_VERSION = "1.0";

}

维护全局配置对象

RPC框架中需要维护一个全局的配置对象,在引入RPC框架的项目启动时,从配置文件中读取配置并创建对象实例,之后就可以集中的从这个对象获取配置信息,而不是每次加载配置时重新读取配置,并创建新的对象,减少了性能开销。

使用设计模式中的单例模式。

单例模式(Singleton Pattern)是设计模式中的一种创建型模式,它确保一个类只有一个实例,并提供一个全局访问点。单例模式的主要目的是控制共享资源的访问,例如数据库连接或线程池。

单例模式的特点

  1. 唯一实例:确保一个类只有一个实例。
  2. 全局访问点:提供一个全局的访问点来获取这个唯一的实例。
  3. 延迟初始化:可以实现懒加载(Lazy Initialization),即在第一次使用时才创建实例。
  4. 线程安全:在多线程环境下,必须保证单例的创建是线程安全的。

一般情况下,我们会使用holder来维护全局配置对象实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.lkj.yurpc;

import com.lkj.yurpc.config.RpcConfig;
import com.lkj.yurpc.constant.RpcConstant;
import com.lkj.yurpc.utils.ConfigUtils;
import lombok.extern.slf4j.Slf4j;

/**
* RPC 框架应用
* 相当于 holder,存放了项目全局用到的变量。双检锁单例模式实现
*/
@Slf4j
public class RpcApplication {

private static volatile RpcConfig rpcConfig;

/**
* 框架初始化,支持传入自定义配置
*
* @param newRpcConfig
*/
public static void init(RpcConfig newRpcConfig) {
rpcConfig = newRpcConfig;
log.info("rpc init, config = {}", newRpcConfig.toString());
}

/**
* 初始化
*/
public static void init() {
RpcConfig newRpcConfig;
try {
newRpcConfig = ConfigUtils.loadConfig(RpcConfig.class, RpcConstant.DEFAULT_CONFIG_PREFIX);
} catch (Exception e) {
// 配置加载失败,使用默认值
newRpcConfig = new RpcConfig();
}
init(newRpcConfig);
}

/**
* 获取配置
*
* @return
*/
public static RpcConfig getRpcConfig() {
if (rpcConfig == null) {
synchronized (RpcApplication.class) {
if (rpcConfig == null) {
init();
}
}
}
return rpcConfig;
}
}

测试

在example-consumer项目的resourses目录编写配置文件application.properties

1
2
3
4
rpc.name=yurpc
rpc.version=2.0
rpc.mock=false
rpc.serializer=jdk

创建ConsumerExample作为扩展后RPC项目的示例消费者类,测试配置文件读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import com.lkj.yurpc.config.RpcConfig;
import com.lkj.yurpc.utils.ConfigUtils;

/**
* 简易服务消费者示例
*

*/
public class ConsumerExample {

public static void main(String[] args) {
RpcConfig rpc = ConfigUtils.loadConfig(RpcConfig.class, "rpc");
System.out.println(rpc);

}
}

image-20241017153826043

接口Mock

需求分析

什么是Mock

RPC框架的核心功能是调用其他远程服务。但是在实际开发和测试过程中,有时候可能无法访问真实的远程服务,并且远程服务可能产生不可控的影响,例如网络延迟,服务不稳定等。在这种情况下,就需要使用mock服务来模拟远程服务的行为,以便进行接口的测试开发和调试。

mock是指模拟对象,通常用于测试代码中,特别是在单元测试中,便于我们跑通业务流程。

设计方案

通过动态代理创建一个调用方法时返回固定值的对象。

开发实现

  • 首先给全局配置类RpcConfig新增mock字段,默认值为false
  • 在Proxy包下新增MockServiceProxy类,用于生成mock代理方法。

在这个类中,需要提供一个根据服务接口类型返回固定值的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.lkj.yurpc.proxy;

import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
* Mock 服务代理(JDK 动态代理)

*/
@Slf4j
public class MockServiceProxy implements InvocationHandler {

/**
* 调用代理
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 根据方法的返回值类型,生成特定的默认值对象
Class<?> methodReturnType = method.getReturnType();
log.info("mock invoke {}", method.getName());
return getDefaultObject(methodReturnType);
}

/**
* 生成指定类型的默认值对象(可自行完善默认值逻辑)
*
* @param type
* @return
*/
private Object getDefaultObject(Class<?> type) {
// 基本类型
if (type.isPrimitive()) {
if (type == boolean.class) {
return false;
} else if (type == short.class) {
return (short) 0;
} else if (type == int.class) {
return 0;
} else if (type == long.class) {
return 0L;
}
}
// 对象类型
return null;
}
}

  • 给ServiceProxyFactory服务代理工厂新增获取mock代理对象的方法getMockProxy。
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 根据服务类获取 Mock 代理对象
*
* @param serviceClass
* @param <T>
* @return
*/
public static <T> T getMockProxy(Class<T> serviceClass) {
return (T) Proxy.newProxyInstance(
serviceClass.getClassLoader(),
new Class[]{serviceClass},
new MockServiceProxy());
}

测试

  • 在UserService中写个具有默认实现的新方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.lkj.example.common.service;

import com.lkj.example.common.model.User;


public interface UserService {

/**
* 获取用户
*
* @param user
* @return
*/
User getUser(User user);

/**
* 新方法 - 获取数字
*/
default short getNumber() {
return 1;
}

}

  • 修改ConsumerExample类,编写调用的新方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ConsumerExample {

public static void main(String[] args) {
//RpcConfig rpc = ConfigUtils.loadConfig(RpcConfig.class, "rpc");
//System.out.println(rpc);
// 获取代理
UserService userService = ServiceProxyFactory.getProxy(UserService.class);
User user = new User();
user.setName("lkj");
// 调用
User newUser = userService.getUser(user);
if (newUser != null) {
System.out.println(newUser.getName());
} else {
System.out.println("user == null");
}
long number = userService.getNumber();
System.out.println(number);

}
}

可以看到输出的结果为0,而不是1,说明调用了模拟服务代理。

image-20241017193602008

SPI

  • SPI(Service Provider Interface) 服务提供接口是Java的机制,主要用于实现模块化开发和插件化开发扩展。
  • SPI机制允许服务提供者通过特定的配置文件将自己的实现注册到系统,然后系统通过反射机制动态加载这些实现,而不需要修改原始框架的代码,从而实现了系统的解耦、提高了可扩展性。

一个典型的SPI应用场景是JDBC

  • 编写SpiLoder加载器

相当于一个工具类,提供了读取配置并加载实现类的方法。

关键实现如下:

  • 用map来存储已知的配置信息,键名->实现类
  • 扫描指定路径,读取每个配置文件,获取到键名->实现类信息并存储在map中
  • 定义获取实例方法,根据用户传入的接口和键名,从map中寻找对应的实现类,然后通过反射获取到实现类对象。可以维护一个对象实例缓存,创建过一次的对象从缓存中读取即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.lkj.yurpc.spi;

import cn.hutool.core.io.resource.ResourceUtil;
import com.lkj.yurpc.serializer.Serializer;
import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* SPI 加载器
* 自定义实现,支持键值对映射
*

*/
@Slf4j
public class SpiLoader {

/**
* 存储已加载的类:接口名 =>(key => 实现类)
*/
private static final Map<String, Map<String, Class<?>>> loaderMap = new ConcurrentHashMap<>();

/**
* 对象实例缓存(避免重复 new),类路径 => 对象实例,单例模式
*/
private static final Map<String, Object> instanceCache = new ConcurrentHashMap<>();

/**
* 系统 SPI 目录
*/
private static final String RPC_SYSTEM_SPI_DIR = "META-INF/rpc/system/";

/**
* 用户自定义 SPI 目录
*/
private static final String RPC_CUSTOM_SPI_DIR = "META-INF/rpc/custom/";

/**
* 扫描路径
*/
private static final String[] SCAN_DIRS = new String[]{RPC_SYSTEM_SPI_DIR, RPC_CUSTOM_SPI_DIR};

/**
* 动态加载的类列表
*/
private static final List<Class<?>> LOAD_CLASS_LIST = Arrays.asList(Serializer.class);

/**
* 加载所有类型
*/
public static void loadAll() {
log.info("加载所有 SPI");
for (Class<?> aClass : LOAD_CLASS_LIST) {
load(aClass);
}
}

/**
* 获取某个接口的实例
*
* @param tClass
* @param key
* @param <T>
* @return
*/
public static <T> T getInstance(Class<?> tClass, String key) {
String tClassName = tClass.getName();
Map<String, Class<?>> keyClassMap = loaderMap.get(tClassName);
if (keyClassMap == null) {
throw new RuntimeException(String.format("SpiLoader 未加载 %s 类型", tClassName));
}
if (!keyClassMap.containsKey(key)) {
throw new RuntimeException(String.format("SpiLoader 的 %s 不存在 key=%s 的类型", tClassName, key));
}
// 获取到要加载的实现类型
Class<?> implClass = keyClassMap.get(key);
// 从实例缓存中加载指定类型的实例
String implClassName = implClass.getName();
if (!instanceCache.containsKey(implClassName)) {
try {
instanceCache.put(implClassName, implClass.newInstance());
} catch (InstantiationException | IllegalAccessException e) {
String errorMsg = String.format("%s 类实例化失败", implClassName);
throw new RuntimeException(errorMsg, e);
}
}
return (T) instanceCache.get(implClassName);
}

/**
* 加载某个类型
*
* @param loadClass
* @throws IOException
*/
public static Map<String, Class<?>> load(Class<?> loadClass) {
log.info("加载类型为 {} 的 SPI", loadClass.getName());
// 扫描路径,用户自定义的 SPI 优先级高于系统 SPI
Map<String, Class<?>> keyClassMap = new HashMap<>();
for (String scanDir : SCAN_DIRS) {
List<URL> resources = ResourceUtil.getResources(scanDir + loadClass.getName());
// 读取每个资源文件
for (URL resource : resources) {
try {
InputStreamReader inputStreamReader = new InputStreamReader(resource.openStream());
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String line;
while ((line = bufferedReader.readLine()) != null) {
String[] strArray = line.split("=");
if (strArray.length > 1) {
String key = strArray[0];
String className = strArray[1];
keyClassMap.put(key, Class.forName(className));
}
}
} catch (Exception e) {
log.error("spi resource load error", e);
}
}
}
loaderMap.put(loadClass.getName(), keyClassMap);
return keyClassMap;
}

public static void main(String[] args) throws IOException, ClassNotFoundException {
loadAll();
System.out.println(loaderMap);
Serializer serializer = getInstance(Serializer.class, "e");
System.out.println(serializer);
}

}

注册中心基本实现

需求分析

RPC框架的一个核心模块式注册中心,其目的是帮助服务消费者获取到服务提供者的调用地址,而不是把调用地址硬编码到项目中

image-20241020182959528

设计方案

注册中心核心功能

  • 数据分布式存储:集中的注册信息数据存储、读取和共享
  • 服务注册:服务提供者上报服务信息到注册中心
  • 服务发现:服务消费者从注册中心拉取服务信息
  • 心跳检测:定期检查服务提供者的存活状态
  • 服务注销:手动剔除节点,或者自动删除失效节点

技术选型

  • 需要一个能够集中存储和读取数据的中间件。还需要有数据过期,数据监听的能力,使我们移除失效节点,更新节点列表。

ETCD

数据结构

采用层次化的键值对来存储数据,支持类似于文件系统路径的层次结构

核心结构:

  • key:Etcd的基本数据单元,类似于文件系统的文件名。每个键都唯一标识一个值,并且可以包含子键,形成于类似路径的层次结构。
  • value:与键关联的数据,通常为字符型

ETCD核心特性:

  • Lease(租约):用于对键值对进行TTL超时设置,即设置键值对的过期时间。
  • Watch(监听):可以监视特定键的变化,当键的值发生变化时,会触发相应通知。

开发实现

注册中心开发

  • 注册信息定义

在model包下新建ServiceMetaInfo类,封装服务的注册信息,包括服务名称、服务版本号、服务地址和服务分组等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.lkj.yurpc.model;

import cn.hutool.core.util.StrUtil;
import com.lkj.yurpc.constant.RpcConstant;
import lombok.Data;

/**
* 服务元信息(注册信息)
*

*/
@Data
public class ServiceMetaInfo {

/**
* 服务名称
*/
private String serviceName;

/**
* 服务版本号
*/
private String serviceVersion = RpcConstant.DEFAULT_SERVICE_VERSION;

/**
* 服务域名
*/
private String serviceHost;

/**
* 服务端口号
*/
private Integer servicePort;

/**
* 服务分组(暂未实现)
*/
private String serviceGroup = "default";
/**
* 获取服务键名
*
* @return
*/
public String getServiceKey() {
// 后续可扩展服务分组
// return String.format("%s:%s:%s", serviceName, serviceVersion, serviceGroup);
return String.format("%s:%s", serviceName, serviceVersion);
}

/**
* 获取服务注册节点键名
*
* @return
*/
public String getServiceNodeKey() {
return String.format("%s/%s:%s", getServiceKey(), serviceHost, servicePort);
}

/**
* 获取完整服务地址
*
* @return
*/
public String getServiceAddress() {
if (!StrUtil.contains(serviceHost, "http")) {
return String.format("http://%s:%s", serviceHost, servicePort);
}
return String.format("%s:%s", serviceHost, servicePort);
}

  • 注册中心配置

在config包下编写注册中心配置类RegistryConfig,让用户配置连接注册中心所需的信息,比如注册中心类别、注册中心地址、用户名、密码、连接超时等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.lkj.yurpc.config;

//import com.lkj.yurpc.registry.RegistryKeys;
import lombok.Data;

/**
* RPC 框架注册中心配置
*

*/
@Data
public class RegistryConfig {

/**
* 注册中心类别
*/
private String registry = "etcd";

/**
* 注册中心地址
*/
private String address = "http://localhost:2380";

/**
* 用户名
*/
private String username;

/**
* 密码
*/
private String password;

/**
* 超时时间(单位毫秒)
*/
private Long timeout = 10000L;
}

  • 注册中心接口

遵循可扩展机制,先写一个注册中心接口,后续可以实现多种不同的注册中心,使用SPI机制动态加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.lkj.yurpc.registry;

import com.lkj.yurpc.config.RegistryConfig;
import com.lkj.yurpc.model.ServiceMetaInfo;

import java.util.List;

/**
* 注册中心
*

*/
public interface Registry {

/**
* 初始化
*
* @param registryConfig
*/
void init(RegistryConfig registryConfig);

/**
* 注册服务(服务端)
*
* @param serviceMetaInfo
*/
void register(ServiceMetaInfo serviceMetaInfo) throws Exception;

/**
* 注销服务(服务端)
*
* @param serviceMetaInfo
*/
void unRegister(ServiceMetaInfo serviceMetaInfo);

/**
* 服务发现(获取某服务的所有节点,消费端)
*
* @param serviceKey 服务键名
* @return
*/
List<ServiceMetaInfo> serviceDiscovery(String serviceKey);

/**
* 心跳检测(服务端)
*/
//void heartBeat();

/**
* 监听(消费端)
*
* @param serviceNodeKey
*/
// void watch(String serviceNodeKey);

/**
* 服务销毁
*/
void destroy();
}
  • etcd注册中心实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package com.lkj.yurpc.registry;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.cron.CronUtil;
import cn.hutool.cron.task.Task;
import cn.hutool.json.JSONUtil;
import com.lkj.yurpc.config.RegistryConfig;
import com.lkj.yurpc.model.ServiceMetaInfo;
import io.etcd.jetcd.*;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class EtcdRegistry implements Registry {

private Client client;

private KV kvClient;

/**
* 根节点
*/
private static final String ETCD_ROOT_PATH = "/rpc/";

@Override
public void init(RegistryConfig registryConfig) {
client = Client.builder().endpoints(registryConfig.getAddress()).connectTimeout(Duration.ofMillis(registryConfig.getTimeout())).build();
kvClient = client.getKVClient();
}

@Override
public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
// 创建 Lease 和 KV 客户端
Lease leaseClient = client.getLeaseClient();

// 创建一个 30 秒的租约
long leaseId = leaseClient.grant(30).get().getID();

// 设置要存储的键值对
String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);
ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);

// 将键值对与租约关联起来,并设置过期时间
PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
kvClient.put(key, value, putOption).get();
}

@Override
public void unRegister(ServiceMetaInfo serviceMetaInfo) {
kvClient.delete(ByteSequence.from(ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey(), StandardCharsets.UTF_8));
}

@Override
public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
// 前缀搜索,结尾一定要加 '/'
String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";

try {
// 前缀查询
GetOption getOption = GetOption.builder().isPrefix(true).build();
List<KeyValue> keyValues = kvClient.get(
ByteSequence.from(searchPrefix, StandardCharsets.UTF_8),
getOption)
.get()
.getKvs();
// 解析服务信息
return keyValues.stream()
.map(keyValue -> {
String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
return JSONUtil.toBean(value, ServiceMetaInfo.class);
})
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("获取服务列表失败", e);
}
}

@Override
public void destroy() {
System.out.println("当前节点下线");
// 释放资源
if (kvClient != null) {
kvClient.close();
}
if (client != null) {
client.close();
}
}
}

支持配置和扩展注册中心

一个成熟的rpc框架可能会支持多个注册中心,我们的需求是,让开发者能够填写配置来指定使用的注册中心,并且支持自定义注册中心

  • 注册中心常量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.lkj.yurpc.registry;

/**
* 注册中心键名常量
*

*/
public interface RegistryKeys {

String ETCD = "etcd";

String ZOOKEEPER = "zookeeper";

}

  • 工厂模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RegistryFactory {

// SPI 动态加载
static {
SpiLoader.load(Registry.class);
}

/**
* 默认注册中心
*/
private static final Registry DEFAULT_REGISTRY = new EtcdRegistry();

/**
* 获取实例
*
* @param key
* @return
*/
public static Registry getInstance(String key) {
return SpiLoader.getInstance(Registry.class, key);
}

}
  • 修改ServiceProxy类
  • [ ] ```
    package com.lkj.yurpc.proxy;

    import cn.hutool.core.collection.CollUtil;
    import cn.hutool.http.HttpRequest;
    import cn.hutool.http.HttpResponse;
    import com.lkj.yurpc.RpcApplication;
    import com.lkj.yurpc.config.RpcConfig;
    import com.lkj.yurpc.constant.RpcConstant;
    import com.lkj.yurpc.model.RpcRequest;
    import com.lkj.yurpc.model.RpcResponse;
    import com.lkj.yurpc.model.ServiceMetaInfo;
    import com.lkj.yurpc.registry.Registry;
    import com.lkj.yurpc.registry.RegistryFactory;
    import com.lkj.yurpc.serializer.Serializer;
    import com.lkj.yurpc.serializer.SerializerFactory;

    import java.io.IOException;
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.util.List;

    /**

    • 服务代理(JDK 动态代理)
      *

      */
      public class ServiceProxy implements InvocationHandler {

      /**

      • 调用代理
        *
      • @return
      • @throws Throwable
        */
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 指定序列化器
        final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());

        // 构造请求
        String serviceName = method.getDeclaringClass().getName();
        RpcRequest rpcRequest = RpcRequest.builder()

             .serviceName(serviceName)
             .methodName(method.getName())
             .parameterTypes(method.getParameterTypes())
             .args(args)
             .build();
        

        try {

         // 序列化
         byte[] bodyBytes = serializer.serialize(rpcRequest);
        
         // 从注册中心获取服务提供者请求地址
         RpcConfig rpcConfig = RpcApplication.getRpcConfig();
         Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
         ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
         serviceMetaInfo.setServiceName(serviceName);
         serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
         List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
         if (CollUtil.isEmpty(serviceMetaInfoList)) {
             throw new RuntimeException("暂无服务地址");
         }
         ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);
        
         // 发送请求
         try (HttpResponse httpResponse = HttpRequest.post(selectedServiceMetaInfo.getServiceAddress())
                 .body(bodyBytes)
                 .execute()) {
             byte[] result = httpResponse.bodyBytes();
             // 反序列化
             RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);
             return rpcResponse.getData();
         }
        

        } catch (IOException e) {

         e.printStackTrace();
        

        }

        return null;
        }
        }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31

    ## 注册中心优化

    ### 需求分析

    之前我们完成了基础的注册中心,能够注册和获取服务和节点信息。

    对于数据一致性、性能优化、高可用性、可扩展性还有很多优化空间。

    ### 注册中心优化

    #### 心跳检测和持续机制

    ##### 介绍

    心跳检测是一种用于检测系统是否正常工作的机制。通过定期发送心跳信号来检测目标系统的状态。

    ##### 方案设计

    实现心跳检测一般需要两个关键:定时、网络请求

    因为etcd自带了key过期机制,我们可以给节点注册信息一个生命倒计时,让节点定期续期,重置自己的倒计时。如果一直不重置,则删除。

    * 服务提供者向etcd注册自己的服务信息,并在注册时设置TTL
    * etcd在收到信息后,自动维护ttl,并在ttl过期时删除
    * 服务提供者定期请求etcd续签自己的注册信息,重写ttl

    ##### 开发实现

    * 给注册中心补充心跳检测方法

    /**

    • 心跳检测(服务端)
      */
      void heartBeat();
      1
      2
      3

      * 维护续期节点集合

      /**
    • 本机注册的节点 key 集合(用于维护续期)
      */
      private final Set localRegisterNodeKeySet = new HashSet<>();

      1
      2
      3

      * 实现hearBeat方法

      @Override
      public void heartBeat() {
      // 10 秒续签一次
      CronUtil.schedule(“/10 “, new Task() {

       @Override
       public void execute() {
           // 遍历本节点所有的 key
           for (String key : localRegisterNodeKeySet) {
               try {
                   List<KeyValue> keyValues = kvClient.get(ByteSequence.from(key, StandardCharsets.UTF_8))
                           .get()
                           .getKvs();
                   // 该节点已过期(需要重启节点才能重新注册)
                   if (CollUtil.isEmpty(keyValues)) {
                       continue;
                   }
                   // 节点未过期,重新注册(相当于续签)
                   KeyValue keyValue = keyValues.get(0);
                   String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
                   ServiceMetaInfo serviceMetaInfo = JSONUtil.toBean(value, ServiceMetaInfo.class);
                   register(serviceMetaInfo);
               } catch (Exception e) {
                   throw new RuntimeException(key + "续签失败", e);
               }
           }
       }
      

      });

      // 支持秒级别定时任务
      CronUtil.setMatchSecond(true);
      CronUtil.start();
      }

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20

      #### 服务节点下线机制

      当服务提供者节点宕机时,应该从注册中心移除已注册的节点。

      ##### 方案设计

      服务节点下线又分为:

      * 主动下线:服务提供者项目正常退出
      * 被动下线:异常退出

      被动下线已经实现,现在开发主动下线。

      利用JVM的ShutdownHook机制。

      ##### 开发实现

      * 完善destory

      public void destroy() {
      System.out.println(“当前节点下线”);
      // 下线节点
      // 遍历本节点所有的 key
      for (String key : localRegisterNodeKeySet) {
      try {

       kvClient.delete(ByteSequence.from(key, StandardCharsets.UTF_8)).get();
      

      } catch (Exception e) {

       throw new RuntimeException(key + "节点下线失败");
      

      }
      }

      // 释放资源
      if (kvClient != null) {
      kvClient.close();
      }
      if (client != null) {
      client.close();
      }
      }

1
2
3

* 在RpcApplication的init中,注册shutdown hook

public static void init(RpcConfig newRpcConfig) {
rpcConfig = newRpcConfig;
log.info(“rpc init, config = {}”, newRpcConfig.toString());
// 注册中心初始化
RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
registry.init(registryConfig);
log.info(“registry init, config = {}”, registryConfig);

// 创建并注册 Shutdown Hook,JVM 退出时执行操作
Runtime.getRuntime().addShutdownHook(new Thread(registry::destroy));

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

## 自定义协议

### 需求分析

目前rpc框架,使用Vert.x的HttpServer作为服务提供者的服务器,底层使用的是http协议。

一般情况下,rpc框架注重性能,而HTTP协议中的头部信息,请求响应格式较重,会影响传输。

所以,我们需要自定义一套rpc协议,来实现更灵活的框架。

### 设计方案

分为两个核心部分:

* 自定义网络传输
* 自定义消息结构

#### 网络传输设计

目标:选择一个能够高性能通信的网络协议和传输方式

选择使用TCP协议完成网络传输,有更多的自主设计空间。

#### 消息结构设计

目标:用最少的空间传递需要的信息。

* 如何用最少空间:尽量使用轻量的类型,比如byte字节类型,同时每个数据凑到整个字节。
* 消息内部所需信息:

魔数:作用是安全校验,防止服务器处理非框架发送的无用信息

版本号:保证请求和响应的一致性

序列化方式:告知如何解析数据

类型:标识是请求还是响应

状态:记录响应结果

请求id:唯一标识某个请求

请求体:包含数据

请求体数据长度:保证完整的获取body内容信息

最终的消息结构设计如下图所示:

![image-20241023175713736](手写RPC框架/image-20241023175713736.png)

请求体信息总长17字节。在后续,需要有消息编码器和消息解码器,编码器先new一个空的Buffer缓冲区,然后按照顺序向缓冲区依次写入;解码器则依次读出。

### 开发设计

#### 消息结构

新建protocol包

* 新建歇息消息类,把消息头单独封装为一个内部类。

package com.lkj.yurpc.protocol;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**

  • 协议消息结构
    *

*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProtocolMessage {

/**
 * 消息头
 */
private Header header;

/**
 * 消息体(请求或响应对象)
 */
private T body;

/**
 * 协议消息头
 */
@Data
public static class Header {

    /**
     * 魔数,保证安全性
     */
    private byte magic;

    /**
     * 版本号
     */
    private byte version;

    /**
     * 序列化器
     */
    private byte serializer;

    /**
     * 消息类型(请求 / 响应)
     */
    private byte type;

    /**
     * 状态
     */
    private byte status;

    /**
     * 请求 id
     */
    private long requestId;

    /**
     * 消息体长度
     */
    private int bodyLength;
}

}

1
2
3

* 新建协议常量类

package com.lkj.yurpc.protocol;

/**

  • 协议常量
    *

    */
    public interface ProtocolConstant {

    /**

    • 消息头长度
      */
      int MESSAGE_HEADER_LENGTH = 17;

      /**

    • 协议魔数
      */
      byte PROTOCOL_MAGIC = 0x1;

      /**

    • 协议版本号
      */
      byte PROTOCOL_VERSION = 0x1;
      }
1
2
3

* 新建消息字段的枚举类,例如:协议状态枚举,暂时只定义成功、请求失败、响应失败三种枚举值

package com.lkj.yurpc.protocol;

import lombok.Getter;

/**

  • 协议消息的状态枚举

@Getter
public enum ProtocolMessageStatusEnum {

OK("ok", 20),
BAD_REQUEST("badRequest", 40),
BAD_RESPONSE("badResponse", 50);

private final String text;

private final int value;

ProtocolMessageStatusEnum(String text, int value) {
    this.text = text;
    this.value = value;
}

/**
 * 根据 value 获取枚举
 *
 * @param value
 * @return
 */
public static ProtocolMessageStatusEnum getEnumByValue(int value) {
    for (ProtocolMessageStatusEnum anEnum : ProtocolMessageStatusEnum.values()) {
        if (anEnum.value == value) {
            return anEnum;
        }
    }
    return null;
}

}

1
2
3

* 协议信息类型枚举,包括请求响应心跳等。


package com.lkj.yurpc.protocol;

import lombok.Getter;

/**

  • 协议消息的类型枚举
    *

    */
    @Getter
    public enum ProtocolMessageTypeEnum {

    REQUEST(0),
    RESPONSE(1),
    HEART_BEAT(2),
    OTHERS(3);

    private final int key;

    ProtocolMessageTypeEnum(int key) {

     this.key = key;
    

    }

    /**

    • 根据 key 获取枚举
      *
    • @param key
    • @return
      */
      public static ProtocolMessageTypeEnum getEnumByKey(int key) {
      for (ProtocolMessageTypeEnum anEnum : ProtocolMessageTypeEnum.values()) {
       if (anEnum.key == key) {
           return anEnum;
       }
      
      }
      return null;
      }
      }
      1
      2
      3

      * 协议消息的序列化器枚举

      package com.lkj.yurpc.protocol;

import cn.hutool.core.util.ObjectUtil;
import lombok.Getter;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**

@Getter
public enum ProtocolMessageSerializerEnum {

JDK(0, "jdk"),
JSON(1, "json"),
KRYO(2, "kryo"),
HESSIAN(3, "hessian");

private final int key;

private final String value;

ProtocolMessageSerializerEnum(int key, String value) {
    this.key = key;
    this.value = value;
}

/**
 * 获取值列表
 *
 * @return
 */
public static List<String> getValues() {
    return Arrays.stream(values()).map(item -> item.value).collect(Collectors.toList());
}

/**
 * 根据 key 获取枚举
 *
 * @param key
 * @return
 */
public static ProtocolMessageSerializerEnum getEnumByKey(int key) {
    for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {
        if (anEnum.key == key) {
            return anEnum;
        }
    }
    return null;
}


/**
 * 根据 value 获取枚举
 *
 * @param value
 * @return
 */
public static ProtocolMessageSerializerEnum getEnumByValue(String value) {
    if (ObjectUtil.isEmpty(value)) {
        return null;
    }
    for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {
        if (anEnum.value.equals(value)) {
            return anEnum;
        }
    }
    return null;
}

}

1
2
3
4
5
6
7
8
9

#### 网络传输

新建server.tcp包,将所有TCP服务相关的代码放到该包中

* TCP服务器实现

新建VertcTcpServer类


package com.lkj.yurpc.server.tcp;

import io.vertx.core.Vertx;
import io.vertx.core.net.NetServer;
import lombok.extern.slf4j.Slf4j;

/**

  • Vertx TCP 服务器
    *

    */
    @Slf4j
    public class VertxTcpServer {

    public void doStart(int port) {

     // 创建 Vert.x 实例
     Vertx vertx = Vertx.vertx();
    
     // 创建 TCP 服务器
     NetServer server = vertx.createNetServer();
    
     // 处理请求
     server.connectHandler(new TcpServerHandler());
    
     // 启动 TCP 服务器并监听指定端口
     server.listen(port, result -> {
         if (result.succeeded()) {
             log.info("TCP server started on port " + port);
         } else {
             log.info("Failed to start TCP server: " + result.cause());
         }
     });
    

    }

    public static void main(String[] args) {

     new VertxTcpServer().doStart(8888);
    

    }
    }

1
2
3

* TCP客户端实现

package com.lkj.yurpc.server.tcp;

import cn.hutool.core.util.IdUtil;
import com.yupi.yurpc.RpcApplication;
import com.yupi.yurpc.model.RpcRequest;
import com.yupi.yurpc.model.RpcResponse;
import com.yupi.yurpc.model.ServiceMetaInfo;
import com.yupi.yurpc.protocol.*;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**

  • Vertx TCP 请求客户端
    *

    */
    public class VertxTcpClient {

    /**

    • 发送请求
      *
    • @param rpcRequest
    • @param serviceMetaInfo
    • @return
    • @throws InterruptedException
    • @throws ExecutionException
      */
      public static RpcResponse doRequest(RpcRequest rpcRequest, ServiceMetaInfo serviceMetaInfo) throws InterruptedException, ExecutionException {
      // 发送 TCP 请求
      Vertx vertx = Vertx.vertx();
      NetClient netClient = vertx.createNetClient();
      CompletableFuture responseFuture = new CompletableFuture<>();
      netClient.connect(serviceMetaInfo.getServicePort(), serviceMetaInfo.getServiceHost(),

           result -> {
               if (!result.succeeded()) {
                   System.err.println("Failed to connect to TCP server");
                   return;
               }
               NetSocket socket = result.result();
               // 发送数据
               // 构造消息
               ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();
               ProtocolMessage.Header header = new ProtocolMessage.Header();
               header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
               header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
               header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey());
               header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
               // 生成全局请求 ID
               header.setRequestId(IdUtil.getSnowflakeNextId());
               protocolMessage.setHeader(header);
               protocolMessage.setBody(rpcRequest);
      
               // 编码请求
               try {
                   Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
                   socket.write(encodeBuffer);
               } catch (IOException e) {
                   throw new RuntimeException("协议消息编码错误");
               }
      
               // 接收响应
               TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper(
                       buffer -> {
                           try {
                               ProtocolMessage<RpcResponse> rpcResponseProtocolMessage =
                                       (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);
                               responseFuture.complete(rpcResponseProtocolMessage.getBody());
                           } catch (IOException e) {
                               throw new RuntimeException("协议消息解码错误");
                           }
                       }
               );
               socket.handler(bufferHandlerWrapper);
      
           });
      

      RpcResponse rpcResponse = responseFuture.get();
      // 记得关闭连接
      netClient.close();
      return rpcResponse;
      }
      }

1
2
3
4
5

#### 编码解码器

* 编码器

package com.lkj.yurpc.protocol;

import com.lkj.yurpc.serializer.Serializer;
import com.lkj.yurpc.serializer.SerializerFactory;
import io.vertx.core.buffer.Buffer;

import java.io.IOException;

/**

  • 协议消息编码器

public class ProtocolMessageEncoder {

/**
 * 编码
 *
 * @param protocolMessage
 * @return
 * @throws IOException
 */
public static Buffer encode(ProtocolMessage<?> protocolMessage) throws IOException {
    if (protocolMessage == null || protocolMessage.getHeader() == null) {
        return Buffer.buffer();
    }
    ProtocolMessage.Header header = protocolMessage.getHeader();
    // 依次向缓冲区写入字节
    Buffer buffer = Buffer.buffer();
    buffer.appendByte(header.getMagic());
    buffer.appendByte(header.getVersion());
    buffer.appendByte(header.getSerializer());
    buffer.appendByte(header.getType());
    buffer.appendByte(header.getStatus());
    buffer.appendLong(header.getRequestId());
    // 获取序列化器
    ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
    if (serializerEnum == null) {
        throw new RuntimeException("序列化协议不存在");
    }
    Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
    byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
    // 写入 body 长度和数据
    buffer.appendInt(bodyBytes.length);
    buffer.appendBytes(bodyBytes);
    return buffer;
}

}

1
2
3

* 解码器

package com.lkj.yurpc.protocol;

import com.lkj.yurpc.model.RpcRequest;
import com.lkj.yurpc.model.RpcResponse;
import com.lkj.yurpc.serializer.Serializer;
import com.lkj.yurpc.serializer.SerializerFactory;
import io.vertx.core.buffer.Buffer;

import java.io.IOException;

/**

  • 协议消息解码器

    */
    public class ProtocolMessageDecoder {

    /**

    • 解码
      *
    • @param buffer
    • @return
    • @throws IOException
      */
      public static ProtocolMessage<?> decode(Buffer buffer) throws IOException {
      // 分别从指定位置读出 Buffer
      ProtocolMessage.Header header = new ProtocolMessage.Header();
      byte magic = buffer.getByte(0);
      // 校验魔数
      if (magic != ProtocolConstant.PROTOCOL_MAGIC) {
       throw new RuntimeException("消息 magic 非法");
      
      }
      header.setMagic(magic);
      header.setVersion(buffer.getByte(1));
      header.setSerializer(buffer.getByte(2));
      header.setType(buffer.getByte(3));
      header.setStatus(buffer.getByte(4));
      header.setRequestId(buffer.getLong(5));
      header.setBodyLength(buffer.getInt(13));
      // 解决粘包问题,只读指定长度的数据
      byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());
      // 解析消息体
      ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
      if (serializerEnum == null) {
       throw new RuntimeException("序列化消息的协议不存在");
      
      }
      Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
      ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByKey(header.getType());
      if (messageTypeEnum == null) {
       throw new RuntimeException("序列化消息的类型不存在");
      
      }
      switch (messageTypeEnum) {
       case REQUEST:
           RpcRequest request = serializer.deserialize(bodyBytes, RpcRequest.class);
           return new ProtocolMessage<>(header, request);
       case RESPONSE:
           RpcResponse response = serializer.deserialize(bodyBytes, RpcResponse.class);
           return new ProtocolMessage<>(header, response);
       case HEART_BEAT:
       case OTHERS:
       default:
           throw new RuntimeException("暂不支持该消息类型");
      
      }
      }

}

1
2
3
4
5

#### 请求处理器

作用是接受请求,然后通过反射调用服务实现类。

package com.lkj.yurpc.server.tcp;

import com.lkj.yurpc.model.RpcRequest;
import com.lkj.yurpc.model.RpcResponse;
import com.lkj.yurpc.protocol.*;
import com.lkj.yurpc.registry.LocalRegistry;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;

import java.io.IOException;
import java.lang.reflect.Method;

/**

  • TCP 请求处理器

    */
    public class TcpServerHandler implements Handler {

    /**

    • 处理请求
      *
    • @param socket the event to handle
      */
      @Override
      public void handle(NetSocket socket) {
      TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper(buffer -> {

       // 接受请求,解码
       ProtocolMessage<RpcRequest> protocolMessage;
       try {
           protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);
       } catch (IOException e) {
           throw new RuntimeException("协议消息解码错误");
       }
       RpcRequest rpcRequest = protocolMessage.getBody();
       ProtocolMessage.Header header = protocolMessage.getHeader();
      
       // 处理请求
       // 构造响应结果对象
       RpcResponse rpcResponse = new RpcResponse();
       try {
           // 获取要调用的服务实现类,通过反射调用
           Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
           Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
           Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
           // 封装返回结果
           rpcResponse.setData(result);
           rpcResponse.setDataType(method.getReturnType());
           rpcResponse.setMessage("ok");
       } catch (Exception e) {
           e.printStackTrace();
           rpcResponse.setMessage(e.getMessage());
           rpcResponse.setException(e);
       }
      
       // 发送响应,编码
       header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());
       header.setStatus((byte) ProtocolMessageStatusEnum.OK.getValue());
       ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);
       try {
           Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);
           socket.write(encode);
       } catch (IOException e) {
           throw new RuntimeException("协议消息编码错误");
       }
      

      });
      socket.handler(bufferHandlerWrapper);
      }

}

1
2
3

#### 请求发送

package com.lkj.yurpc.server.tcp;

import com.lkj.yurpc.protocol.ProtocolConstant;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;

/**

  • TCP 消息处理器包装
  • 装饰者模式,使用 recordParser 对原有的 buffer 处理能力进行增强

    */
    public class TcpBufferHandlerWrapper implements Handler {

    /**

    • 解析器,用于解决半包、粘包问题
      */
      private final RecordParser recordParser;

      public TcpBufferHandlerWrapper(Handler bufferHandler) {
      recordParser = initRecordParser(bufferHandler);
      }

      @Override
      public void handle(Buffer buffer) {
      recordParser.handle(buffer);
      }

      /**

    • 初始化解析器
      *
    • @param bufferHandler
    • @return
      */
      private RecordParser initRecordParser(Handler bufferHandler) {
      // 构造 parser
      RecordParser parser = RecordParser.newFixed(ProtocolConstant.MESSAGE_HEADER_LENGTH);

      parser.setOutput(new Handler() {

       // 初始化
       int size = -1;
       // 一次完整的读取(头 + 体)
       Buffer resultBuffer = Buffer.buffer();
      
       @Override
       public void handle(Buffer buffer) {
           // 1. 每次循环,首先读取消息头
           if (-1 == size) {
               // 读取消息体长度
               size = buffer.getInt(13);
               parser.fixedSizeMode(size);
               // 写入头信息到结果
               resultBuffer.appendBuffer(buffer);
           } else {
               // 2. 然后读取消息体
               // 写入体信息到结果
               resultBuffer.appendBuffer(buffer);
               // 已拼接为完整 Buffer,执行处理
               bufferHandler.handle(resultBuffer);
               // 重置一轮
               parser.fixedSizeMode(ProtocolConstant.MESSAGE_HEADER_LENGTH);
               size = -1;
               resultBuffer = Buffer.buffer();
           }
       }
      

      });

      return parser;
      }
      }

1
2
3
4
5
6
7

### 粘包半包问题解决

#### 概念

加入客户端连续两次发送信息:

// 第一次
Hello, server!Hello, server!Hello, server!Hello, server!
// 第二次
Hello, server!Hello, server!Hello, server!Hello, server!

1
2
3
4
5

单服务器收到的可能是:

* 每次收到的数据更少了,这种情况叫做半包

// 第一次
Hello, server!Hello, server!
// 第二次
Hello, server!Hello, server!Hello, server!

1
2
3

* 每次收到的数据更多了,这种情况叫做粘包

// 第三次
Hello, server!Hello, server!Hello, server!Hello, server!Hello, server!

1
2
3
4
5
6
7

#### 如何解决

* 解决半包

核心思路:在消息体中设置请求体长度,服务端接收到时,判断每次信息长度是否符合预期,不完整就不读。

if (buffer == null || buffer.length() == 0) {
throw new RuntimeException(“消息 buffer 为空”);
}
if (buffer.getBytes().length < ProtocolConstant.MESSAGE_HEADER_LENGTH) {
throw new RuntimeException(“出现了半包问题”);
}

1
2
3
4
5

* 解决粘包

核心思路也是类似的:每次只读取指定长度的数据,超过长度的留着下一次接收到消息时再读取。

// 解决粘包问题,只读指定长度的数据
byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

## 负载均衡

### 需求分析

现在 RPC 框架已经可以从注册中心获取到服务提供者的注册信息了,同一个服务可能会有多个服务提供者,但是目前消费者始终读取了第一个服务提供者节点发起调用,不仅会增大单个节点的压力,而且没有利用好其他节点的资源。

我们完全可以从服务提供者节点中,选择一个服务提供者发起请求,而不是每次都请求同一个服务提供者,这个操作就叫做 **负载均衡**。

### 负载均衡

#### 定义

1)何为负载?可以把负载理解为要处理的工作和压力,比如网络请求、事务、数据处理任务等。

2)何为均衡?把工作和压力平均地分配给多个工作者,从而分摊每个工作者的压力,保证大家正常工作。

#### 常见负载均衡算法

* 轮询:按照循坏顺序请求分配给每个服务器
* 随机:随机选择
* 加权轮询:根据服务器的性能或权重分配请求。
* 加权随机
* 最小连接数
* IP Hash

### 开发实现

#### 多种负载器实现

在RPC项目中新建loadbalancer包

* 编写通用接口

package com.lkj.yurpc.loadbalancer;

import com.lkj.yurpc.model.ServiceMetaInfo;

import java.util.List;
import java.util.Map;

/**

  • 负载均衡器(消费端使用)
    *

    */
    public interface LoadBalancer {

    /**

    • 选择服务调用
      *
    • @param requestParams 请求参数
    • @param serviceMetaInfoList 可用服务列表
    • @return
      */
      ServiceMetaInfo select(Map requestParams, List serviceMetaInfoList);
      }
1
2
3

* 轮询负载均衡

package com.lkj.yurpc.loadbalancer;

import com.lkj.yurpc.model.ServiceMetaInfo;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**

  • 轮询负载均衡器

    */
    public class RoundRobinLoadBalancer implements LoadBalancer {

    /**

    • 当前轮询的下标
      */
      private final AtomicInteger currentIndex = new AtomicInteger(0);

      @Override
      public ServiceMetaInfo select(Map requestParams, List serviceMetaInfoList) {
      if (serviceMetaInfoList.isEmpty()) {

       return null;
      

      }
      // 只有一个服务,无需轮询
      int size = serviceMetaInfoList.size();
      if (size == 1) {

       return serviceMetaInfoList.get(0);
      

      }
      // 取模算法轮询
      int index = currentIndex.getAndIncrement() % size;
      return serviceMetaInfoList.get(index);
      }
      }

1
2
3

* 随机负载均衡

package com.lkj.yurpc.loadbalancer;

import com.lkj.yurpc.model.ServiceMetaInfo;

import java.util.List;
import java.util.Map;
import java.util.Random;

/**

  • 随机负载均衡器

    */
    public class RandomLoadBalancer implements LoadBalancer {

    private final Random random = new Random();

    @Override
    public ServiceMetaInfo select(Map requestParams, List serviceMetaInfoList) {

     int size = serviceMetaInfoList.size();
     if (size == 0) {
         return null;
     }
     // 只有 1 个服务,不用随机
     if (size == 1) {
         return serviceMetaInfoList.get(0);
     }
     return serviceMetaInfoList.get(random.nextInt(size));
    

    }
    }

    1
    2
    3

    * 实现一致性 Hash 负载均衡器

    package com.lkj.yurpc.loadbalancer;

import com.lkj.yurpc.model.ServiceMetaInfo;

import java.util.List;
import java.util.Map;
import java.util.TreeMap;

/**

  • 一致性哈希负载均衡器
    *

    */
    public class ConsistentHashLoadBalancer implements LoadBalancer {

    /**

    • 一致性 Hash 环,存放虚拟节点
      */
      private final TreeMap virtualNodes = new TreeMap<>();

      /**

    • 虚拟节点数
      */
      private static final int VIRTUAL_NODE_NUM = 100;

      @Override
      public ServiceMetaInfo select(Map requestParams, List serviceMetaInfoList) {
      if (serviceMetaInfoList.isEmpty()) {

       return null;
      

      }

      // 构建虚拟节点环
      for (ServiceMetaInfo serviceMetaInfo : serviceMetaInfoList) {

       for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
           int hash = getHash(serviceMetaInfo.getServiceAddress() + "#" + i);
           virtualNodes.put(hash, serviceMetaInfo);
       }
      

      }

      // 获取调用请求的 hash 值
      int hash = getHash(requestParams);

      // 选择最接近且大于等于调用请求 hash 值的虚拟节点
      Map.Entry entry = virtualNodes.ceilingEntry(hash);
      if (entry == null) {

       // 如果没有大于等于调用请求 hash 值的虚拟节点,则返回环首部的节点
       entry = virtualNodes.firstEntry();
      

      }
      return entry.getValue();
      }

/**
 * Hash 算法,可自行实现
 *
 * @param key
 * @return
 */
private int getHash(Object key) {
    return key.hashCode();
}

}

1
2
3
4
5
6
7

#### 支持配置和扩展

一个成熟的 RPC 框架可能会支持多个负载均衡器,像序列化器和注册中心一样,我们的需求是,让开发者能够填写配置来指定使用的负载均衡器,并且支持自定义负载均衡器,让框架更易用、更利于扩展。

* 负载均衡器常量

package com.lkj.yurpc.loadbalancer;

/**

  • 负载均衡器键名常量
    *

    */
    public interface LoadBalancerKeys {

    /**

    • 轮询
      */
      String ROUND_ROBIN = “roundRobin”;

      String RANDOM = “random”;

      String CONSISTENT_HASH = “consistentHash”;

}

1
2
3

* 使用工厂模式,支持根据key从SPI获取负载均衡器对象实例

package com.lkj.yurpc.loadbalancer;

import com.lkj.yurpc.spi.SpiLoader;

/**

  • 负载均衡器工厂(工厂模式,用于获取负载均衡器对象)
    *

    */
    public class LoadBalancerFactory {

    static {

     SpiLoader.load(LoadBalancer.class);
    

    }

    /**

    • 默认负载均衡器
      */
      private static final LoadBalancer DEFAULT_LOAD_BALANCER = new RoundRobinLoadBalancer();

      /**

    • 获取实例
      *
    • @param key
    • @return
      */
      public static LoadBalancer getInstance(String key) {
      return SpiLoader.getInstance(LoadBalancer.class, key);
      }

}

1
2
3

* 为RPC全局配置新增

/**

 * 负载均衡器
 */
private String loadBalancer = LoadBalancerKeys.ROUND_ROBIN;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

## 重试机制

### 需求分析

目前,如果使用 RPC 框架的服务消费者调用接口失败,就会直接报错。

调用接口失败可能有很多原因,有时可能是服务提供者返回了错误,但有时可能只是网络不稳定或服务提供者重启等临时性问题。这种情况下,我们可能更希望服务消费者拥有自动重试的能力,提高系统的可用性。

### 设计方案

#### 重试条件

当由于网络等异常情况发生时,触发重试。

#### 重试时间

* 固定重试间隔:在每次重试之间使用固定的时间间隔
* 指数退避重试:在每次失败后,重试的时间间隔会以指数级增加,以避免请求过于密集。
* 随机延迟重试:在每次重试之间使用随机的时间间隔,以避免请求的同时发生。
* 可变延迟重试:这种策略更 “高级” 了,根据先前重试的成功或失败情况,动态调整下一次重试的延迟时间。比如,根据前一次的响应时间调整下一次重试的等待时间。

#### 停止重试

一般来说,重试次数是有上限的,否则随着报错的增多,系统同时发生的重试也会越来越多,造成雪崩。

主流的停止重试策略有:

1)最大尝试次数:一般重试当达到最大次数时不再重试。

2)超时停止:重试达到最大时间的时候,停止重试。

#### 重试工作

需要注意的是,当重试次数超过上限时,往往还要进行其他的操作,比如:

1. 通知告警:让开发者人工介入
2. 降级容错:改为调用其他接口、或者执行其他操作

### 开发实现

* 编写重试策略通用接口

package com.lkj.yurpc.fault.retry;

import com.lkj.yurpc.model.RpcResponse;

import java.util.concurrent.Callable;

/**

  • 重试策略
    *

    */
    public interface RetryStrategy {

    /**

    • 重试
      *
    • @param callable
    • @return
    • @throws Exception
      */
      RpcResponse doRetry(Callable callable) throws Exception;
      }
1
2
3

* 不重试策略实现

package com.lkj.yurpc.fault.retry;

import com.lkj.yurpc.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;

/**

  • 不重试 - 重试策略
    *

    */
    @Slf4j
    public class NoRetryStrategy implements RetryStrategy {

    /**

    • 重试
      *
    • @param callable
    • @return
    • @throws Exception
      */
      public RpcResponse doRetry(Callable callable) throws Exception {
      return callable.call();
      }

}

1
2
3

* 固定重试间隔策略

package com.lkj.yurpc.fault.retry;

import com.github.rholder.retry.*;
import com.lkj.yurpc.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**

  • 固定时间间隔 - 重试策略
    *

    */
    @Slf4j
    public class FixedIntervalRetryStrategy implements RetryStrategy {

    /**

    • 重试
      *
    • @param callable
    • @return
    • @throws ExecutionException
    • @throws RetryException
      */
      public RpcResponse doRetry(Callable callable) throws ExecutionException, RetryException {
      Retryer retryer = RetryerBuilder.newBuilder()
           .retryIfExceptionOfType(Exception.class)
           .withWaitStrategy(WaitStrategies.fixedWait(3L, TimeUnit.SECONDS))
           .withStopStrategy(StopStrategies.stopAfterAttempt(3))
           .withRetryListener(new RetryListener() {
               @Override
               public <V> void onRetry(Attempt<V> attempt) {
                   log.info("重试次数 {}", attempt.getAttemptNumber());
               }
           })
           .build();
      
      return retryer.call(callable);
      }

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

上述代码中,重试策略如下:

- 重试条件:使用 retryIfExceptionOfType 方法指定当出现 Exception 异常时重试。
- 重试等待策略:使用 withWaitStrategy 方法指定策略,选择 fixedWait 固定时间间隔策略。
- 重试停止策略:使用 withStopStrategy 方法指定策略,选择 stopAfterAttempt 超过最大重试次数停止。
- 重试工作:使用 withRetryListener 监听重试,每次重试时,除了再次执行任务外,还能够打印当前的重试次数。

#### 支持配置和扩展重试策略

需求是,让开发者能够填写配置来指定使用的重试策略,并且支持自定义重试策略,让框架更易用、更利于扩展。

要实现这点,开发方式和序列化器、注册中心、负载均衡器都是一样的,都可以使用工厂创建对象、使用 SPI 动态加载自定义的注册中心。

* 常量

package com.lkj.yurpc.fault.retry;

/**

  • 重试策略键名常量
    *

    */
    public interface RetryStrategyKeys {

    /**

    • 不重试
      */
      String NO = “no”;

      /**

    • 固定时间间隔
      */
      String FIXED_INTERVAL = “fixedInterval”;

}

1
2
3

* 使用工厂模式,根据key从SP获取重试策略对象实例

package com.lkj.yurpc.fault.retry;

import com.lkj.yurpc.spi.SpiLoader;

/**

  • 重试策略工厂(用于获取重试器对象)
    *

    */
    public class RetryStrategyFactory {

    static {

     SpiLoader.load(RetryStrategy.class);
    

    }

    /**

    • 默认重试器
      */
      private static final RetryStrategy DEFAULT_RETRY_STRATEGY = new NoRetryStrategy();

      /**

    • 获取实例
      *
    • @param key
    • @return
      */
      public static RetryStrategy getInstance(String key) {
      return SpiLoader.getInstance(RetryStrategy.class, key);
      }

}

1
2
3

* 应用重试功能

// 使用重试机制
RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
RpcResponse rpcResponse = retryStrategy.doRetry(() ->
VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo)
);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

## 容错机制

### 需求分析

当系统出现异常时,我们仍要保证可以通过一定策略稳步运行。

### 设计方案

#### 容错机制

容错是指系统在出现异常情况时,可以通过一定的策略保证系统仍然稳定运行,从而提高系统的可靠性和健壮性。1800771338183979009_0.5649165956367674

在分布式系统中,容错机制尤为重要,因为分布式系统中的各个组件都可能存在网络故障、节点故障等各种异常情况。要顾全大局,尽可能消除偶发 / 单点故障对系统带来的整体影响。

打个比方,将分布式系统类比为一家公司,如果公司某个优秀员工请假了,需要 “触发容错”,让另一个普通员工顶上,这本质上是容错机制的一种 **降级** 策略。

##### 容错策略

容错策略有很多种,常用的容错策略主要是以下几个:

1)Fail-Over 故障转移:一次调用失败后,切换一个其他节点再次进行调用,也算是一种重试。1800771338183979009_0.0852483407241389

2)Fail-Back 失败自动恢复:系统的某个功能出现调用失败或错误时,通过其他的方法,恢复该功能的正常。可以理解为降级,比如重试、调用其他服务等。

3)Fail-Safe 静默处理:系统出现部分非重要功能的异常时,直接忽略掉,不做任何处理,就像错误没有发生过一样。

4)Fail-Fast 快速失败:系统出现调用错误时,立刻报错,交给外层调用方处理。

##### 容错实现方式

容错其实是个比较广泛的概念,除了上面几种策略外,很多技术都可以起到容错的作用。

比如:

1)重试:重试本质上也是一种容错的降级策略,系统错误后再试一次。

2)限流:当系统压力过大、已经出现部分错误时,通过限制执行操作(接受请求)的频率或数量,对系统进行保护。

3)降级:系统出现错误后,改为执行其他更稳定可用的操作。也可以叫做 “兜底” 或 “有损服务”,这种方式的本质是:即使牺牲一定的服务质量,也要保证系统的部分功能可用,保证基本的功能需求得到满足。

4)熔断:系统出现故障或异常时,暂时中断对该服务的请求,而是执行其他操作,以避免连锁故障。

5)超时控制:如果请求或操作长时间没处理完成,就进行中断,防止阻塞和资源占用。

注意,在实际项目中,根据对系统可靠性的需求,我们通常会结合多种策略或方法实现容错机制。

### 开发实现

* 通用接口

package com.lkj.yurpc.fault.tolerant;

import com.lkj.yurpc.model.RpcResponse;

import java.util.Map;

/**

  • 容错策略
    *

    */
    public interface TolerantStrategy {

    /**

    • 容错
      *
    • @param context 上下文,用于传递数据
    • @param e 异常
    • @return
      */
      RpcResponse doTolerant(Map context, Exception e);
      }
      1
      2
      3
      4
      5

      * 快速失败容错策略实现

      遇到异常后,将异常再次抛出,交给外层实现

      package com.lkj.yurpc.fault.tolerant;

import com.lkj.yurpc.model.RpcResponse;

import java.util.Map;

/**

  • 快速失败 - 容错策略(立刻通知外层调用方)
    *

    */
    public class FailFastTolerantStrategy implements TolerantStrategy {

    @Override
    public RpcResponse doTolerant(Map context, Exception e) {

     throw new RuntimeException("服务报错", e);
    

    }
    }

1
2
3
4
5

* 静默处理容错策略实现

遇到异常后,记录一条日志,然后正常返回一个响应对象

package com.lkj.yurpc.fault.tolerant;

import com.lkj.yurpc.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

/**

  • 静默处理异常 - 容错策略
    *

    */
    @Slf4j
    public class FailSafeTolerantStrategy implements TolerantStrategy {

    @Override
    public RpcResponse doTolerant(Map context, Exception e) {

     log.info("静默处理异常", e);
     return new RpcResponse();
    

    }
    }

1
2
3
4
5

#### 配置和扩展容错策略

* 常量

package com.lkj.yurpc.fault.tolerant;

/**

  • 容错策略键名常量
    *

    */
    public interface TolerantStrategyKeys {

    /**

    • 故障恢复
      */
      // String FAIL_BACK = “failBack”;

      /**

    • 快速失败
      */
      String FAIL_FAST = “failFast”;

      /**

    • 故障转移
      */
      // String FAIL_OVER = “failOver”;

      /**

    • 静默处理
      */
      String FAIL_SAFE = “failSafe”;

}

1
2
3

* 使用工厂模式

package com.lkj.yurpc.fault.tolerant;

import com.lkj.yurpc.spi.SpiLoader;

/**

  • 容错策略工厂(工厂模式,用于获取容错策略对象)
    *

    */
    public class TolerantStrategyFactory {

    static {

     SpiLoader.load(TolerantStrategy.class);
    

    }

    /**

    • 默认容错策略
      */
      private static final TolerantStrategy DEFAULT_RETRY_STRATEGY = new FailFastTolerantStrategy();

      /**

    • 获取实例
      *
    • @param key
    • @return
      */
      public static TolerantStrategy getInstance(String key) {
      return SpiLoader.getInstance(TolerantStrategy.class, key);
      }

}

1
2
3

* 应用

// rpc 请求
// 使用重试机制
RpcResponse rpcResponse;
try {
RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
rpcResponse = retryStrategy.doRetry(() ->
VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo)
);
} catch (Exception e) {
// 容错机制
TolerantStrategy tolerantStrategy = TolerantStrategyFactory.getInstance(rpcConfig.getTolerantStrategy());
rpcResponse = tolerantStrategy.doTolerant(null, e);
}
return rpcResponse.getData();

```