手写RPC框架
简易版本RPC开发
基本概念
什么是RPC
RPC:Remote Procedure Call即远程过程调用,是一种计算机通信协议,允许程序在不同计算机之间进行通信和交互,就像本地调用一样。
为什么需要RPC
- 透明性:RPC 提供了一种机制,使得远程服务调用看起来就像本地方法调用一样。开发者不需要关心底层的网络通信细节,可以专注于业务逻辑。
- 封装性:RPC 框架通常会处理序列化、反序列化、网络传输等复杂操作,将这些细节从应用程序中抽象出来。
- 模块化:通过将服务拆分为多个独立的服务,并使用 RPC 进行通信,可以实现更好的模块化。每个服务都可以独立开发、部署和扩展。
- 解耦合:服务之间的依赖关系可以通过接口定义来管理,而不是直接的代码依赖。这使得系统的各个部分更加松散耦合,更容易进行修改和升级。
RPC框架实现思路
基本设计
假设有消费者和服务提供者两个角色
消费者想要调用提供者,就需要提供者启动一个web服务,然后通过请求客户端发送HTTP请求或者其他协议来调用。
若有多个服务和方法,每个接口都要单独写一个接口,则较为麻烦。
则提供一个统一的服务调用接口,通过请求处理器根据客户端的请求参数来进行不同的处理、调用不同的服务和方法。
可以在服务提供者维护一个本地服务注册器,记录服务和对应实现类的映射。
另外,需要注意的是,java对象在网络中无法直接传输,所以需要对传输的参数进行序列化和反序列化。
所以,一个简易的RPC框架就此诞生。
开发简易版RPC框架
简易版实现几个模块:
- example-common:示例代码的公共依赖,包括接口、model等
- example-consumer:示例服务消费者代码
- example-provide:示例服务t提供者代码
- lkj-rpc-easy:简易版rpc框架
公共模块
- 编写实体类User
1 | package com.lkj.example.common.model; |
对象需要实习序列化接口,为后续网络传输序列化提供支持
- 编写用户服务接口UserService,提供一个获取用户的方法
1 | package com.lkj.example.common.service; |
服务提供者
- 编写服务实现类,实现公共模块中定义的用户服务接口
功能是打印用户的名称,并且返回参数中的用户对象
1 | package com.lkj.example.provider; |
- 编写服务提供者启动类
1 | package com.lkj.example.provider; |
服务消费者
- 创建服务消费者启动类,编写调用接口的代码
1 | package com.lkj.example.consumer; |
这里我们还没有获取userService实例,所以定义为null
web服务器
要让服务提供者提供可远程访问的服务,那么就需要一个web服务器,接受处理并返回请求。
此处使用高性能的NIO框架Vert.x作为RPC框架的web服务器。
- 编写一个web服务器的接口HttpServer,定义统一的启动服务器方法
1 | package com.lkj.yurpc.server; |
- 编写基于Vert.x的web服务器VertxHttpServer,能够箭筒指定端口并处理请求
1 | package com.lkj.yurpc.server; |
本地服务注册器
在简易版本中,暂时不用第三方注册中心,直接把服务注册到服务提供者本地。
- 使用线程安全的ConcurrentHashMap存储服务注册信息,key为服务名称,value为服务的实现类
1 | package com.lkj.yurpc.registry; |
序列化器
- 序列化:将java对象转为可传输的字节数组
- 反序列化:将字节数组转换为可传输的java对象
为了方便此处选择java原生的序列化器。
1 | package com.lkj.yurpc.serializer; |
提供者调用-请求处理器
- RpcRequest的作用是封装调用所需的信息,比如服务名称,方法名称,调用参数的类型列表,参数列表。
1 | package com.lkj.yurpc.model; |
- 响应类RpcResponse的作用是封装调用方法得到的返回值、以及调用的信息等。
1 | package com.lkj.yurpc.model; |
- 编写请求处理器 HttpServerHandler
业务流程如下:
1.反序列化请求为对象,并从请求对象中获取参数
2.根据服务名称从本地注册器中获取到对应的服务实现类
3.通过反射机制调用方法,得到返回结果
4.对返回结果进行封装和序列化,并写入响应
1 | package com.lkj.yurpc.server; |
消费方发起调用-代理
代理的实现方式大致分为2类:静态代理和动态代理
静态代理
静态代理是指为每一个特定类型的接口或对象,编写一个代理类。
静态代理虽然很好理解,但缺点也很明显,如果给每个服务接口都写一个实现类,是非常麻烦的,代理方式灵活性很差。
所以RPC框架中,会使用动态代理。
动态代理
动态代理的作用是,根据要生成的对象类型,自动生成一个代理对象。
常用的动态代理实现方式有JDK动态代理和基于字节码生成的动态代理。前者简单易用、无需引入额外的库,但缺点是只能对接口进行代理;后者更灵活、可以对任何类进行代理,但性能略低于JDK动态代理。
1 | package com.lkj.yurpc.proxy; |
当用户调用某个接口的方法时,会改为调用invoke方法。在invoke方法中,我们可以获取到要调用的方法信息,传入的参数列表等。
需要注意的时,上述代码中,请求的服务提供者地址被硬编码了,需要使用注册中心和服务发现机制来解决。
- 创建动态代理工厂ServiceProxyFactory,作用是根据指定类创建动态代理对象
1 | package com.lkj.yurpc.proxy; |
简易测试
静态代理的调试过程
- 直接跳到20行,开始调用getuser
- 进入UserServiceProxy的getUser方法
- 执行到http请求,进入拦截器部分,即第35行
- 开始执行拦截器,进行反射调用返回结果,中间进行序列化反序列化等操作
- 返回代理中,得到结果
动态代理调试过程
InvocationHandler
接口:ServiceProxy
类实现了InvocationHandler
接口,这是 Java 动态代理的核心接口。invoke
方法:invoke
方法是InvocationHandler
接口中的唯一方法。这个方法会在每次通过代理对象调用方法时被调用。invoke方法接收三个参数:
proxy
:代理对象本身。method
:被调用的方法。args
:方法的参数。
动态代理的工作流程
- 构造请求:
invoke
方法首先构建一个RpcRequest
对象,包含服务名、方法名、参数类型和参数值。 - 序列化请求:将
RpcRequest
对象序列化为字节数组。 - 发送请求:使用 HTTP 请求库(如 Hutool)将序列化的请求发送到指定的服务器地址(这里是硬编码的
http://localhost:8080
)。 - 接收响应:从服务器接收响应,并将其反序列化为
RpcResponse
对象。 - 返回结果:从
RpcResponse
中提取数据并返回给调用者。
与静态代理的区别
静态代理
- 手动创建代理类:静态代理需要为每个接口手动创建一个代理类,代理类实现了相同的接口。
- 编译时确定:静态代理在编译时就已经确定,代理类的代码是固定的。
- 灵活性差:如果接口发生变化,需要手动修改代理类,维护成本高。
动态代理
- 自动生成代理类:动态代理在运行时自动生成代理类,不需要手动编写代理类。
- 运行时确定:动态代理在运行时根据实际调用的方法动态生成代理类。
- 灵活性高:动态代理可以灵活地添加或修改拦截逻辑,适用于多种场景,特别是当接口经常变化时。
由于动态代理中,invoke方法会在每次通过代理对象调用方法时被调用。拦截器调用次数多,调试较为繁琐,所以不做图片展示。
全局配置加载
需求分析
在RPC框架的运行中,会涉及到很多的配置信息,比如注册中心的地址,序列化方式,网络服务器端口号等等。
在简易版本的RPC项目中,我们采用了硬编码的方式,不利于维护。
同时PRC框架是需要被其他项目作为提供者或消费者引入的,所以我们应当允许引入框架的项目编写配置文件来自定义配置。
因此,需要配置一套全局配置加载功能。
项目实现
配置加载
- 新建RpcConfig,用于保存配置信息
1 | package com.lkj.yurpc.config; |
- 新建工具类ConfigUtils,作用是读配置文件并返回配置对象
1 | package com.lkj.yurpc.utils; |
- 新建RpcConstant接口,用于存储RPC框架相关常量
1 | package com.lkj.yurpc.constant; |
维护全局配置对象
RPC框架中需要维护一个全局的配置对象,在引入RPC框架的项目启动时,从配置文件中读取配置并创建对象实例,之后就可以集中的从这个对象获取配置信息,而不是每次加载配置时重新读取配置,并创建新的对象,减少了性能开销。
使用设计模式中的单例模式。
单例模式(Singleton Pattern)是设计模式中的一种创建型模式,它确保一个类只有一个实例,并提供一个全局访问点。单例模式的主要目的是控制共享资源的访问,例如数据库连接或线程池。
单例模式的特点
- 唯一实例:确保一个类只有一个实例。
- 全局访问点:提供一个全局的访问点来获取这个唯一的实例。
- 延迟初始化:可以实现懒加载(Lazy Initialization),即在第一次使用时才创建实例。
- 线程安全:在多线程环境下,必须保证单例的创建是线程安全的。
一般情况下,我们会使用holder来维护全局配置对象实例。
1 | package com.lkj.yurpc; |
测试
在example-consumer项目的resourses目录编写配置文件application.properties
1 | rpc.name=yurpc |
创建ConsumerExample作为扩展后RPC项目的示例消费者类,测试配置文件读取。
1 | import com.lkj.yurpc.config.RpcConfig; |
接口Mock
需求分析
什么是Mock
RPC框架的核心功能是调用其他远程服务。但是在实际开发和测试过程中,有时候可能无法访问真实的远程服务,并且远程服务可能产生不可控的影响,例如网络延迟,服务不稳定等。在这种情况下,就需要使用mock服务来模拟远程服务的行为,以便进行接口的测试开发和调试。
mock是指模拟对象,通常用于测试代码中,特别是在单元测试中,便于我们跑通业务流程。
设计方案
通过动态代理创建一个调用方法时返回固定值的对象。
开发实现
- 首先给全局配置类RpcConfig新增mock字段,默认值为false
- 在Proxy包下新增MockServiceProxy类,用于生成mock代理方法。
在这个类中,需要提供一个根据服务接口类型返回固定值的方法。
1 | package com.lkj.yurpc.proxy; |
- 给ServiceProxyFactory服务代理工厂新增获取mock代理对象的方法getMockProxy。
1 | /** |
测试
- 在UserService中写个具有默认实现的新方法
1 | package com.lkj.example.common.service; |
- 修改ConsumerExample类,编写调用的新方法
1 | public class ConsumerExample { |
可以看到输出的结果为0,而不是1,说明调用了模拟服务代理。
SPI
- SPI(Service Provider Interface) 服务提供接口是Java的机制,主要用于实现模块化开发和插件化开发扩展。
- SPI机制允许服务提供者通过特定的配置文件将自己的实现注册到系统,然后系统通过反射机制动态加载这些实现,而不需要修改原始框架的代码,从而实现了系统的解耦、提高了可扩展性。
一个典型的SPI应用场景是JDBC
- 编写SpiLoder加载器
相当于一个工具类,提供了读取配置并加载实现类的方法。
关键实现如下:
- 用map来存储已知的配置信息,键名->实现类
- 扫描指定路径,读取每个配置文件,获取到键名->实现类信息并存储在map中
- 定义获取实例方法,根据用户传入的接口和键名,从map中寻找对应的实现类,然后通过反射获取到实现类对象。可以维护一个对象实例缓存,创建过一次的对象从缓存中读取即可。
1 | package com.lkj.yurpc.spi; |
注册中心基本实现
需求分析
RPC框架的一个核心模块式注册中心,其目的是帮助服务消费者获取到服务提供者的调用地址,而不是把调用地址硬编码到项目中
设计方案
注册中心核心功能
- 数据分布式存储:集中的注册信息数据存储、读取和共享
- 服务注册:服务提供者上报服务信息到注册中心
- 服务发现:服务消费者从注册中心拉取服务信息
- 心跳检测:定期检查服务提供者的存活状态
- 服务注销:手动剔除节点,或者自动删除失效节点
技术选型
- 需要一个能够集中存储和读取数据的中间件。还需要有数据过期,数据监听的能力,使我们移除失效节点,更新节点列表。
ETCD
数据结构
采用层次化的键值对来存储数据,支持类似于文件系统路径的层次结构
核心结构:
- key:Etcd的基本数据单元,类似于文件系统的文件名。每个键都唯一标识一个值,并且可以包含子键,形成于类似路径的层次结构。
- value:与键关联的数据,通常为字符型
ETCD核心特性:
- Lease(租约):用于对键值对进行TTL超时设置,即设置键值对的过期时间。
- Watch(监听):可以监视特定键的变化,当键的值发生变化时,会触发相应通知。
开发实现
注册中心开发
- 注册信息定义
在model包下新建ServiceMetaInfo类,封装服务的注册信息,包括服务名称、服务版本号、服务地址和服务分组等。
1 | package com.lkj.yurpc.model; |
- 注册中心配置
在config包下编写注册中心配置类RegistryConfig,让用户配置连接注册中心所需的信息,比如注册中心类别、注册中心地址、用户名、密码、连接超时等。
1 | package com.lkj.yurpc.config; |
- 注册中心接口
遵循可扩展机制,先写一个注册中心接口,后续可以实现多种不同的注册中心,使用SPI机制动态加载
1 | package com.lkj.yurpc.registry; |
- etcd注册中心实现
1 | package com.lkj.yurpc.registry; |
支持配置和扩展注册中心
一个成熟的rpc框架可能会支持多个注册中心,我们的需求是,让开发者能够填写配置来指定使用的注册中心,并且支持自定义注册中心
- 注册中心常量
1 | package com.lkj.yurpc.registry; |
- 工厂模式
1 | public class RegistryFactory { |
- 修改ServiceProxy类
[ ] ```
package com.lkj.yurpc.proxy;import cn.hutool.core.collection.CollUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.lkj.yurpc.RpcApplication;
import com.lkj.yurpc.config.RpcConfig;
import com.lkj.yurpc.constant.RpcConstant;
import com.lkj.yurpc.model.RpcRequest;
import com.lkj.yurpc.model.RpcResponse;
import com.lkj.yurpc.model.ServiceMetaInfo;
import com.lkj.yurpc.registry.Registry;
import com.lkj.yurpc.registry.RegistryFactory;
import com.lkj.yurpc.serializer.Serializer;
import com.lkj.yurpc.serializer.SerializerFactory;import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.List;/**
服务代理(JDK 动态代理)
**/
public class ServiceProxy implements InvocationHandler {/**
- 调用代理
* - @return
@throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 指定序列化器
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());// 构造请求
String serviceName = method.getDeclaringClass().getName();
RpcRequest rpcRequest = RpcRequest.builder().serviceName(serviceName) .methodName(method.getName()) .parameterTypes(method.getParameterTypes()) .args(args) .build();
try {
// 序列化 byte[] bodyBytes = serializer.serialize(rpcRequest); // 从注册中心获取服务提供者请求地址 RpcConfig rpcConfig = RpcApplication.getRpcConfig(); Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry()); ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo(); serviceMetaInfo.setServiceName(serviceName); serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION); List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey()); if (CollUtil.isEmpty(serviceMetaInfoList)) { throw new RuntimeException("暂无服务地址"); } ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0); // 发送请求 try (HttpResponse httpResponse = HttpRequest.post(selectedServiceMetaInfo.getServiceAddress()) .body(bodyBytes) .execute()) { byte[] result = httpResponse.bodyBytes(); // 反序列化 RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class); return rpcResponse.getData(); }
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
- 调用代理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
## 注册中心优化
### 需求分析
之前我们完成了基础的注册中心,能够注册和获取服务和节点信息。
对于数据一致性、性能优化、高可用性、可扩展性还有很多优化空间。
### 注册中心优化
#### 心跳检测和持续机制
##### 介绍
心跳检测是一种用于检测系统是否正常工作的机制。通过定期发送心跳信号来检测目标系统的状态。
##### 方案设计
实现心跳检测一般需要两个关键:定时、网络请求
因为etcd自带了key过期机制,我们可以给节点注册信息一个生命倒计时,让节点定期续期,重置自己的倒计时。如果一直不重置,则删除。
* 服务提供者向etcd注册自己的服务信息,并在注册时设置TTL
* etcd在收到信息后,自动维护ttl,并在ttl过期时删除
* 服务提供者定期请求etcd续签自己的注册信息,重写ttl
##### 开发实现
* 给注册中心补充心跳检测方法/**
- 心跳检测(服务端)
*/
void heartBeat();/**1
2
3
* 维护续期节点集合 本机注册的节点 key 集合(用于维护续期)
*/
private final SetlocalRegisterNodeKeySet = new HashSet<>(); 1
2
3
* 实现hearBeat方法@Override
public void heartBeat() {
// 10 秒续签一次
CronUtil.schedule(“/10 “, new Task() {@Override public void execute() { // 遍历本节点所有的 key for (String key : localRegisterNodeKeySet) { try { List<KeyValue> keyValues = kvClient.get(ByteSequence.from(key, StandardCharsets.UTF_8)) .get() .getKvs(); // 该节点已过期(需要重启节点才能重新注册) if (CollUtil.isEmpty(keyValues)) { continue; } // 节点未过期,重新注册(相当于续签) KeyValue keyValue = keyValues.get(0); String value = keyValue.getValue().toString(StandardCharsets.UTF_8); ServiceMetaInfo serviceMetaInfo = JSONUtil.toBean(value, ServiceMetaInfo.class); register(serviceMetaInfo); } catch (Exception e) { throw new RuntimeException(key + "续签失败", e); } } }
});
// 支持秒级别定时任务
CronUtil.setMatchSecond(true);
CronUtil.start();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#### 服务节点下线机制
当服务提供者节点宕机时,应该从注册中心移除已注册的节点。
##### 方案设计
服务节点下线又分为:
* 主动下线:服务提供者项目正常退出
* 被动下线:异常退出
被动下线已经实现,现在开发主动下线。
利用JVM的ShutdownHook机制。
##### 开发实现
* 完善destorypublic void destroy() {
System.out.println(“当前节点下线”);
// 下线节点
// 遍历本节点所有的 key
for (String key : localRegisterNodeKeySet) {
try {kvClient.delete(ByteSequence.from(key, StandardCharsets.UTF_8)).get();
} catch (Exception e) {
throw new RuntimeException(key + "节点下线失败");
}
}// 释放资源
if (kvClient != null) {
kvClient.close();
}
if (client != null) {
client.close();
}
}
1 |
|
public static void init(RpcConfig newRpcConfig) {
rpcConfig = newRpcConfig;
log.info(“rpc init, config = {}”, newRpcConfig.toString());
// 注册中心初始化
RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
registry.init(registryConfig);
log.info(“registry init, config = {}”, registryConfig);
// 创建并注册 Shutdown Hook,JVM 退出时执行操作
Runtime.getRuntime().addShutdownHook(new Thread(registry::destroy));
}
1 |
|
package com.lkj.yurpc.protocol;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
- 协议消息结构
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProtocolMessage
/**
* 消息头
*/
private Header header;
/**
* 消息体(请求或响应对象)
*/
private T body;
/**
* 协议消息头
*/
@Data
public static class Header {
/**
* 魔数,保证安全性
*/
private byte magic;
/**
* 版本号
*/
private byte version;
/**
* 序列化器
*/
private byte serializer;
/**
* 消息类型(请求 / 响应)
*/
private byte type;
/**
* 状态
*/
private byte status;
/**
* 请求 id
*/
private long requestId;
/**
* 消息体长度
*/
private int bodyLength;
}
}
1 |
|
package com.lkj.yurpc.protocol;
/**
协议常量
**/
public interface ProtocolConstant {/**
消息头长度
*/
int MESSAGE_HEADER_LENGTH = 17;/**
协议魔数
*/
byte PROTOCOL_MAGIC = 0x1;/**
- 协议版本号
*/
byte PROTOCOL_VERSION = 0x1;
}
1 |
|
package com.lkj.yurpc.protocol;
import lombok.Getter;
/**
- 协议消息的状态枚举
@Getter
public enum ProtocolMessageStatusEnum {
OK("ok", 20),
BAD_REQUEST("badRequest", 40),
BAD_RESPONSE("badResponse", 50);
private final String text;
private final int value;
ProtocolMessageStatusEnum(String text, int value) {
this.text = text;
this.value = value;
}
/**
* 根据 value 获取枚举
*
* @param value
* @return
*/
public static ProtocolMessageStatusEnum getEnumByValue(int value) {
for (ProtocolMessageStatusEnum anEnum : ProtocolMessageStatusEnum.values()) {
if (anEnum.value == value) {
return anEnum;
}
}
return null;
}
}1
2
3
* 协议信息类型枚举,包括请求响应心跳等。
package com.lkj.yurpc.protocol;
import lombok.Getter;
/**
协议消息的类型枚举
**/
@Getter
public enum ProtocolMessageTypeEnum {REQUEST(0),
RESPONSE(1),
HEART_BEAT(2),
OTHERS(3);private final int key;
ProtocolMessageTypeEnum(int key) {
this.key = key;
}
/**
- 根据 key 获取枚举
* - @param key
- @return
*/
public static ProtocolMessageTypeEnum getEnumByKey(int key) {
for (ProtocolMessageTypeEnum anEnum : ProtocolMessageTypeEnum.values()) {
}if (anEnum.key == key) { return anEnum; }
return null;
}
}package com.lkj.yurpc.protocol;1
2
3
* 协议消息的序列化器枚举
- 根据 key 获取枚举
import cn.hutool.core.util.ObjectUtil;
import lombok.Getter;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
@Getter
public enum ProtocolMessageSerializerEnum {
JDK(0, "jdk"),
JSON(1, "json"),
KRYO(2, "kryo"),
HESSIAN(3, "hessian");
private final int key;
private final String value;
ProtocolMessageSerializerEnum(int key, String value) {
this.key = key;
this.value = value;
}
/**
* 获取值列表
*
* @return
*/
public static List<String> getValues() {
return Arrays.stream(values()).map(item -> item.value).collect(Collectors.toList());
}
/**
* 根据 key 获取枚举
*
* @param key
* @return
*/
public static ProtocolMessageSerializerEnum getEnumByKey(int key) {
for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {
if (anEnum.key == key) {
return anEnum;
}
}
return null;
}
/**
* 根据 value 获取枚举
*
* @param value
* @return
*/
public static ProtocolMessageSerializerEnum getEnumByValue(String value) {
if (ObjectUtil.isEmpty(value)) {
return null;
}
for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {
if (anEnum.value.equals(value)) {
return anEnum;
}
}
return null;
}
}1
2
3
4
5
6
7
8
9
#### 网络传输
新建server.tcp包,将所有TCP服务相关的代码放到该包中
* TCP服务器实现
新建VertcTcpServer类
package com.lkj.yurpc.server.tcp;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetServer;
import lombok.extern.slf4j.Slf4j;
/**
Vertx TCP 服务器
**/
@Slf4j
public class VertxTcpServer {public void doStart(int port) {
// 创建 Vert.x 实例 Vertx vertx = Vertx.vertx(); // 创建 TCP 服务器 NetServer server = vertx.createNetServer(); // 处理请求 server.connectHandler(new TcpServerHandler()); // 启动 TCP 服务器并监听指定端口 server.listen(port, result -> { if (result.succeeded()) { log.info("TCP server started on port " + port); } else { log.info("Failed to start TCP server: " + result.cause()); } });
}
public static void main(String[] args) {
new VertxTcpServer().doStart(8888);
}
}
1 |
|
package com.lkj.yurpc.server.tcp;
import cn.hutool.core.util.IdUtil;
import com.yupi.yurpc.RpcApplication;
import com.yupi.yurpc.model.RpcRequest;
import com.yupi.yurpc.model.RpcResponse;
import com.yupi.yurpc.model.ServiceMetaInfo;
import com.yupi.yurpc.protocol.*;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
Vertx TCP 请求客户端
**/
public class VertxTcpClient {/**
- 发送请求
* - @param rpcRequest
- @param serviceMetaInfo
- @return
- @throws InterruptedException
@throws ExecutionException
*/
public static RpcResponse doRequest(RpcRequest rpcRequest, ServiceMetaInfo serviceMetaInfo) throws InterruptedException, ExecutionException {
// 发送 TCP 请求
Vertx vertx = Vertx.vertx();
NetClient netClient = vertx.createNetClient();
CompletableFutureresponseFuture = new CompletableFuture<>();
netClient.connect(serviceMetaInfo.getServicePort(), serviceMetaInfo.getServiceHost(),result -> { if (!result.succeeded()) { System.err.println("Failed to connect to TCP server"); return; } NetSocket socket = result.result(); // 发送数据 // 构造消息 ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>(); ProtocolMessage.Header header = new ProtocolMessage.Header(); header.setMagic(ProtocolConstant.PROTOCOL_MAGIC); header.setVersion(ProtocolConstant.PROTOCOL_VERSION); header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey()); header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey()); // 生成全局请求 ID header.setRequestId(IdUtil.getSnowflakeNextId()); protocolMessage.setHeader(header); protocolMessage.setBody(rpcRequest); // 编码请求 try { Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage); socket.write(encodeBuffer); } catch (IOException e) { throw new RuntimeException("协议消息编码错误"); } // 接收响应 TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper( buffer -> { try { ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer); responseFuture.complete(rpcResponseProtocolMessage.getBody()); } catch (IOException e) { throw new RuntimeException("协议消息解码错误"); } } ); socket.handler(bufferHandlerWrapper); });
RpcResponse rpcResponse = responseFuture.get();
// 记得关闭连接
netClient.close();
return rpcResponse;
}
}
- 发送请求
1 |
|
package com.lkj.yurpc.protocol;
import com.lkj.yurpc.serializer.Serializer;
import com.lkj.yurpc.serializer.SerializerFactory;
import io.vertx.core.buffer.Buffer;
import java.io.IOException;
/**
- 协议消息编码器
public class ProtocolMessageEncoder {
/**
* 编码
*
* @param protocolMessage
* @return
* @throws IOException
*/
public static Buffer encode(ProtocolMessage<?> protocolMessage) throws IOException {
if (protocolMessage == null || protocolMessage.getHeader() == null) {
return Buffer.buffer();
}
ProtocolMessage.Header header = protocolMessage.getHeader();
// 依次向缓冲区写入字节
Buffer buffer = Buffer.buffer();
buffer.appendByte(header.getMagic());
buffer.appendByte(header.getVersion());
buffer.appendByte(header.getSerializer());
buffer.appendByte(header.getType());
buffer.appendByte(header.getStatus());
buffer.appendLong(header.getRequestId());
// 获取序列化器
ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if (serializerEnum == null) {
throw new RuntimeException("序列化协议不存在");
}
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
// 写入 body 长度和数据
buffer.appendInt(bodyBytes.length);
buffer.appendBytes(bodyBytes);
return buffer;
}
}
1 |
|
package com.lkj.yurpc.protocol;
import com.lkj.yurpc.model.RpcRequest;
import com.lkj.yurpc.model.RpcResponse;
import com.lkj.yurpc.serializer.Serializer;
import com.lkj.yurpc.serializer.SerializerFactory;
import io.vertx.core.buffer.Buffer;
import java.io.IOException;
/**
协议消息解码器
*/
public class ProtocolMessageDecoder {/**
- 解码
* - @param buffer
- @return
- @throws IOException
*/
public static ProtocolMessage<?> decode(Buffer buffer) throws IOException {
// 分别从指定位置读出 Buffer
ProtocolMessage.Header header = new ProtocolMessage.Header();
byte magic = buffer.getByte(0);
// 校验魔数
if (magic != ProtocolConstant.PROTOCOL_MAGIC) {
}throw new RuntimeException("消息 magic 非法");
header.setMagic(magic);
header.setVersion(buffer.getByte(1));
header.setSerializer(buffer.getByte(2));
header.setType(buffer.getByte(3));
header.setStatus(buffer.getByte(4));
header.setRequestId(buffer.getLong(5));
header.setBodyLength(buffer.getInt(13));
// 解决粘包问题,只读指定长度的数据
byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());
// 解析消息体
ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if (serializerEnum == null) {
}throw new RuntimeException("序列化消息的协议不存在");
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByKey(header.getType());
if (messageTypeEnum == null) {
}throw new RuntimeException("序列化消息的类型不存在");
switch (messageTypeEnum) {
}case REQUEST: RpcRequest request = serializer.deserialize(bodyBytes, RpcRequest.class); return new ProtocolMessage<>(header, request); case RESPONSE: RpcResponse response = serializer.deserialize(bodyBytes, RpcResponse.class); return new ProtocolMessage<>(header, response); case HEART_BEAT: case OTHERS: default: throw new RuntimeException("暂不支持该消息类型");
}
- 解码
}
1 |
|
package com.lkj.yurpc.server.tcp;
import com.lkj.yurpc.model.RpcRequest;
import com.lkj.yurpc.model.RpcResponse;
import com.lkj.yurpc.protocol.*;
import com.lkj.yurpc.registry.LocalRegistry;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import java.io.IOException;
import java.lang.reflect.Method;
/**
TCP 请求处理器
*/
public class TcpServerHandler implements Handler{ /**
- 处理请求
* @param socket the event to handle
*/
@Override
public void handle(NetSocket socket) {
TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper(buffer -> {// 接受请求,解码 ProtocolMessage<RpcRequest> protocolMessage; try { protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer); } catch (IOException e) { throw new RuntimeException("协议消息解码错误"); } RpcRequest rpcRequest = protocolMessage.getBody(); ProtocolMessage.Header header = protocolMessage.getHeader(); // 处理请求 // 构造响应结果对象 RpcResponse rpcResponse = new RpcResponse(); try { // 获取要调用的服务实现类,通过反射调用 Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName()); Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes()); Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs()); // 封装返回结果 rpcResponse.setData(result); rpcResponse.setDataType(method.getReturnType()); rpcResponse.setMessage("ok"); } catch (Exception e) { e.printStackTrace(); rpcResponse.setMessage(e.getMessage()); rpcResponse.setException(e); } // 发送响应,编码 header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey()); header.setStatus((byte) ProtocolMessageStatusEnum.OK.getValue()); ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse); try { Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage); socket.write(encode); } catch (IOException e) { throw new RuntimeException("协议消息编码错误"); }
});
socket.handler(bufferHandlerWrapper);
}
- 处理请求
}
1 |
|
package com.lkj.yurpc.server.tcp;
import com.lkj.yurpc.protocol.ProtocolConstant;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;
/**
- TCP 消息处理器包装
装饰者模式,使用 recordParser 对原有的 buffer 处理能力进行增强
*/
public class TcpBufferHandlerWrapper implements Handler{ /**
解析器,用于解决半包、粘包问题
*/
private final RecordParser recordParser;public TcpBufferHandlerWrapper(Handler
bufferHandler) {
recordParser = initRecordParser(bufferHandler);
}@Override
public void handle(Buffer buffer) {
recordParser.handle(buffer);
}/**
- 初始化解析器
* - @param bufferHandler
@return
*/
private RecordParser initRecordParser(HandlerbufferHandler) {
// 构造 parser
RecordParser parser = RecordParser.newFixed(ProtocolConstant.MESSAGE_HEADER_LENGTH);parser.setOutput(new Handler
() { // 初始化 int size = -1; // 一次完整的读取(头 + 体) Buffer resultBuffer = Buffer.buffer(); @Override public void handle(Buffer buffer) { // 1. 每次循环,首先读取消息头 if (-1 == size) { // 读取消息体长度 size = buffer.getInt(13); parser.fixedSizeMode(size); // 写入头信息到结果 resultBuffer.appendBuffer(buffer); } else { // 2. 然后读取消息体 // 写入体信息到结果 resultBuffer.appendBuffer(buffer); // 已拼接为完整 Buffer,执行处理 bufferHandler.handle(resultBuffer); // 重置一轮 parser.fixedSizeMode(ProtocolConstant.MESSAGE_HEADER_LENGTH); size = -1; resultBuffer = Buffer.buffer(); } }
});
return parser;
}
}
1 |
|
// 第一次
Hello, server!Hello, server!Hello, server!Hello, server!
// 第二次
Hello, server!Hello, server!Hello, server!Hello, server!
1 |
|
// 第一次
Hello, server!Hello, server!
// 第二次
Hello, server!Hello, server!Hello, server!
1 |
|
// 第三次
Hello, server!Hello, server!Hello, server!Hello, server!Hello, server!
1 |
|
if (buffer == null || buffer.length() == 0) {
throw new RuntimeException(“消息 buffer 为空”);
}
if (buffer.getBytes().length < ProtocolConstant.MESSAGE_HEADER_LENGTH) {
throw new RuntimeException(“出现了半包问题”);
}
1 |
|
// 解决粘包问题,只读指定长度的数据
byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());
1 |
|
package com.lkj.yurpc.loadbalancer;
import com.lkj.yurpc.model.ServiceMetaInfo;
import java.util.List;
import java.util.Map;
/**
负载均衡器(消费端使用)
**/
public interface LoadBalancer {/**
- 选择服务调用
* - @param requestParams 请求参数
- @param serviceMetaInfoList 可用服务列表
- @return
*/
ServiceMetaInfo select(MaprequestParams, List serviceMetaInfoList);
}
- 选择服务调用
1 |
|
package com.lkj.yurpc.loadbalancer;
import com.lkj.yurpc.model.ServiceMetaInfo;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
轮询负载均衡器
*/
public class RoundRobinLoadBalancer implements LoadBalancer {/**
当前轮询的下标
*/
private final AtomicInteger currentIndex = new AtomicInteger(0);@Override
public ServiceMetaInfo select(MaprequestParams, List serviceMetaInfoList) {
if (serviceMetaInfoList.isEmpty()) {return null;
}
// 只有一个服务,无需轮询
int size = serviceMetaInfoList.size();
if (size == 1) {return serviceMetaInfoList.get(0);
}
// 取模算法轮询
int index = currentIndex.getAndIncrement() % size;
return serviceMetaInfoList.get(index);
}
}
1 |
|
package com.lkj.yurpc.loadbalancer;
import com.lkj.yurpc.model.ServiceMetaInfo;
import java.util.List;
import java.util.Map;
import java.util.Random;
/**
随机负载均衡器
*/
public class RandomLoadBalancer implements LoadBalancer {private final Random random = new Random();
@Override
public ServiceMetaInfo select(MaprequestParams, List serviceMetaInfoList) { int size = serviceMetaInfoList.size(); if (size == 0) { return null; } // 只有 1 个服务,不用随机 if (size == 1) { return serviceMetaInfoList.get(0); } return serviceMetaInfoList.get(random.nextInt(size));
}
}1
2
3
* 实现一致性 Hash 负载均衡器package com.lkj.yurpc.loadbalancer;
import com.lkj.yurpc.model.ServiceMetaInfo;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
/**
一致性哈希负载均衡器
**/
public class ConsistentHashLoadBalancer implements LoadBalancer {/**
一致性 Hash 环,存放虚拟节点
*/
private final TreeMapvirtualNodes = new TreeMap<>(); /**
虚拟节点数
*/
private static final int VIRTUAL_NODE_NUM = 100;@Override
public ServiceMetaInfo select(MaprequestParams, List serviceMetaInfoList) {
if (serviceMetaInfoList.isEmpty()) {return null;
}
// 构建虚拟节点环
for (ServiceMetaInfo serviceMetaInfo : serviceMetaInfoList) {for (int i = 0; i < VIRTUAL_NODE_NUM; i++) { int hash = getHash(serviceMetaInfo.getServiceAddress() + "#" + i); virtualNodes.put(hash, serviceMetaInfo); }
}
// 获取调用请求的 hash 值
int hash = getHash(requestParams);// 选择最接近且大于等于调用请求 hash 值的虚拟节点
Map.Entryentry = virtualNodes.ceilingEntry(hash);
if (entry == null) {// 如果没有大于等于调用请求 hash 值的虚拟节点,则返回环首部的节点 entry = virtualNodes.firstEntry();
}
return entry.getValue();
}
/**
* Hash 算法,可自行实现
*
* @param key
* @return
*/
private int getHash(Object key) {
return key.hashCode();
}
}
1 |
|
package com.lkj.yurpc.loadbalancer;
/**
负载均衡器键名常量
**/
public interface LoadBalancerKeys {/**
轮询
*/
String ROUND_ROBIN = “roundRobin”;String RANDOM = “random”;
String CONSISTENT_HASH = “consistentHash”;
}
1 |
|
package com.lkj.yurpc.loadbalancer;
import com.lkj.yurpc.spi.SpiLoader;
/**
负载均衡器工厂(工厂模式,用于获取负载均衡器对象)
**/
public class LoadBalancerFactory {static {
SpiLoader.load(LoadBalancer.class);
}
/**
默认负载均衡器
*/
private static final LoadBalancer DEFAULT_LOAD_BALANCER = new RoundRobinLoadBalancer();/**
- 获取实例
* - @param key
- @return
*/
public static LoadBalancer getInstance(String key) {
return SpiLoader.getInstance(LoadBalancer.class, key);
}
}
1 |
|
/**
* 负载均衡器
*/
private String loadBalancer = LoadBalancerKeys.ROUND_ROBIN;
1 |
|
package com.lkj.yurpc.fault.retry;
import com.lkj.yurpc.model.RpcResponse;
import java.util.concurrent.Callable;
/**
重试策略
**/
public interface RetryStrategy {/**
- 重试
* - @param callable
- @return
- @throws Exception
*/
RpcResponse doRetry(Callablecallable) throws Exception;
}
- 重试
1 |
|
package com.lkj.yurpc.fault.retry;
import com.lkj.yurpc.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
/**
不重试 - 重试策略
**/
@Slf4j
public class NoRetryStrategy implements RetryStrategy {/**
- 重试
* - @param callable
- @return
- @throws Exception
*/
public RpcResponse doRetry(Callablecallable) throws Exception {
return callable.call();
}
- 重试
}
1 |
|
package com.lkj.yurpc.fault.retry;
import com.github.rholder.retry.*;
import com.lkj.yurpc.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
固定时间间隔 - 重试策略
**/
@Slf4j
public class FixedIntervalRetryStrategy implements RetryStrategy {/**
- 重试
* - @param callable
- @return
- @throws ExecutionException
- @throws RetryException
*/
public RpcResponse doRetry(Callablecallable) throws ExecutionException, RetryException {
Retryerretryer = RetryerBuilder. newBuilder()
return retryer.call(callable);.retryIfExceptionOfType(Exception.class) .withWaitStrategy(WaitStrategies.fixedWait(3L, TimeUnit.SECONDS)) .withStopStrategy(StopStrategies.stopAfterAttempt(3)) .withRetryListener(new RetryListener() { @Override public <V> void onRetry(Attempt<V> attempt) { log.info("重试次数 {}", attempt.getAttemptNumber()); } }) .build();
}
- 重试
}
1 |
|
package com.lkj.yurpc.fault.retry;
/**
重试策略键名常量
**/
public interface RetryStrategyKeys {/**
不重试
*/
String NO = “no”;/**
- 固定时间间隔
*/
String FIXED_INTERVAL = “fixedInterval”;
}
1 |
|
package com.lkj.yurpc.fault.retry;
import com.lkj.yurpc.spi.SpiLoader;
/**
重试策略工厂(用于获取重试器对象)
**/
public class RetryStrategyFactory {static {
SpiLoader.load(RetryStrategy.class);
}
/**
默认重试器
*/
private static final RetryStrategy DEFAULT_RETRY_STRATEGY = new NoRetryStrategy();/**
- 获取实例
* - @param key
- @return
*/
public static RetryStrategy getInstance(String key) {
return SpiLoader.getInstance(RetryStrategy.class, key);
}
}
1 |
|
// 使用重试机制
RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
RpcResponse rpcResponse = retryStrategy.doRetry(() ->
VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo)
);
1 |
|
package com.lkj.yurpc.fault.tolerant;
import com.lkj.yurpc.model.RpcResponse;
import java.util.Map;
/**
容错策略
**/
public interface TolerantStrategy {/**
- 容错
* - @param context 上下文,用于传递数据
- @param e 异常
- @return
*/
RpcResponse doTolerant(Mapcontext, Exception e);
}package com.lkj.yurpc.fault.tolerant;1
2
3
4
5
* 快速失败容错策略实现
遇到异常后,将异常再次抛出,交给外层实现
- 容错
import com.lkj.yurpc.model.RpcResponse;
import java.util.Map;
/**
快速失败 - 容错策略(立刻通知外层调用方)
**/
public class FailFastTolerantStrategy implements TolerantStrategy {@Override
public RpcResponse doTolerant(Mapcontext, Exception e) { throw new RuntimeException("服务报错", e);
}
}
1 |
|
package com.lkj.yurpc.fault.tolerant;
import com.lkj.yurpc.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
静默处理异常 - 容错策略
**/
@Slf4j
public class FailSafeTolerantStrategy implements TolerantStrategy {@Override
public RpcResponse doTolerant(Mapcontext, Exception e) { log.info("静默处理异常", e); return new RpcResponse();
}
}
1 |
|
package com.lkj.yurpc.fault.tolerant;
/**
容错策略键名常量
**/
public interface TolerantStrategyKeys {/**
故障恢复
*/
// String FAIL_BACK = “failBack”;/**
快速失败
*/
String FAIL_FAST = “failFast”;/**
故障转移
*/
// String FAIL_OVER = “failOver”;/**
- 静默处理
*/
String FAIL_SAFE = “failSafe”;
}
1 |
|
package com.lkj.yurpc.fault.tolerant;
import com.lkj.yurpc.spi.SpiLoader;
/**
容错策略工厂(工厂模式,用于获取容错策略对象)
**/
public class TolerantStrategyFactory {static {
SpiLoader.load(TolerantStrategy.class);
}
/**
默认容错策略
*/
private static final TolerantStrategy DEFAULT_RETRY_STRATEGY = new FailFastTolerantStrategy();/**
- 获取实例
* - @param key
- @return
*/
public static TolerantStrategy getInstance(String key) {
return SpiLoader.getInstance(TolerantStrategy.class, key);
}
}
1 |
|
// rpc 请求
// 使用重试机制
RpcResponse rpcResponse;
try {
RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
rpcResponse = retryStrategy.doRetry(() ->
VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo)
);
} catch (Exception e) {
// 容错机制
TolerantStrategy tolerantStrategy = TolerantStrategyFactory.getInstance(rpcConfig.getTolerantStrategy());
rpcResponse = tolerantStrategy.doTolerant(null, e);
}
return rpcResponse.getData();
```