0%

JAVA面试题

Redis

使用场景

缓存

缓存穿透

  • 定义:查询一个不存在数据,每次查询都请求数据库
  • 解决方案一:缓存空数据。

优点:简单

缺点:消耗内存,可能发生不一致问题

  • 方案二:布隆过滤器

优点:内存占用少,没有多余key

缺点:实现复杂,存在误判

缓存击穿

  • 定义:给某一个key设置了过期时间,当key过期的时候,恰好这个时间点对这个key有大量的并发请求过来,这些并发请求可能会瞬间把DB压垮。
  • 解决方案一:互斥锁

image-20241113173234318

可以保证数据强一致性,但是性能较差

  • 方案二:逻辑过期

image-20241113173410421

性能较高,不能保证数据绝对一致。

缓存雪崩

  • 定义:同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库

image-20241113173610988

双写一致性

image-20241113174048777

一致性高:

image-20241113174219179

读写锁效率更高一点。

一致性要求不高:

image-20241113174336123

允许延时一致。

image-20241113174506462

持久化

  • RDB

image-20241113175006561

  • AOF

image-20241113180658834

image-20241113180919940

数据过期策略

  • 惰性删除

image-20241113181025752

  • 定时删除

image-20241113181041510

Redis的过期策略是二者结合

数据淘汰策略

image-20241113181313121

image-20241113181334256

分布式锁

image-20241113182238011

image-20241113182418127

image-20241206103952684

redisson实现的分布式锁-可重入。

其他

Redis集群

主从复制

image-20241113183241827

全量同步:

image-20241113183404603

增量同步:

image-20241113183902910

哨兵

image-20241113184040244

image-20241113184200081

分片集群

image-20241113184906722

image-20241113185253280

其他

image-20241113185344002

MYSQL

优化

定位慢查询

image-20241113193527339

image-20241113193610868

如何分析

image-20241113194008431

image-20241113194108483

image-20241113194144778

image-20241113194235780

索引

image-20241113194829230

image-20241113195204510

image-20241113200121864

覆盖索引

image-20241113200156128

超大分页

image-20241113200333291

image-20241113200355259

索引创建原则

image-20241113200928741

索引失效

image-20241113201501827

image-20241113201640636

image-20241113201654548

image-20241113201714881

image-20241113201757677

经验

image-20241113202207342

  • 表的设计优化

image-20241113202252800

  • SQL语句优化

image-20241113202318812

  • 主从复制、读写分离

image-20241113202522118

事务

特性

image-20241113202950902

并发事务

image-20241114134351047

解决方法

image-20241114134617042

log

image-20241114134947783

image-20241114135054074

image-20241114135133135

MVCC

image-20241114135315206

image-20241206104426924

主从同步原理

image-20241114135757985

image-20241206111130223

分库分表

image-20241206160019201

框架

Spring

bean

spring中的单例bean不是线程安全的。

image-20241114140622142

AOP

  • 定义

image-20241114141048318

image-20241114141748278

事务失效场景

image-20241114142404580

image-20241114142553578

image-20241114142716007

image-20241114142744275

bean的生命周期

image-20241206161638418

循环引用

image-20241114150316128

SpringMVC执行流程

image-20241114151239697

image-20241114151348773

Springboot自动配置原理

自动装配,简单来说就是自动把第三方组件的 Bean 装载到 Spring IOC 器里面,不需要开发人员再去写 Bean 的装配配置。

在 Spring Boot 应用里面,只需要在启动类加上@SpringBootApplication 注解就可以实现自动装配。

image-20241114152123776

常见注解

image-20241114152423611

MyBatis

执行流程

image-20241114153450089

延迟加载

image-20241114154312368

image-20241114154940442

一、二级缓存

image-20241114161250932

集合

List

数组

ArrayList源码

image-20241115125549338

ArrayList底层原理

image-20241115131352180

数组和List之间的转换

image-20241115131634843

image-20241115132854166

image-20241115133642645

HashMap

  • 散列表

image-20241115134428936

image-20241115134435017

实现原理

image-20241115135057467

PUT

image-20241115140059260

扩容机制

image-20241115142626905

image-20241115142849019

寻址算法

image-20241115143951187

image-20241115144012925

多线程

线程基础

进程和线程

image-20241115145010258

创建线程方式

image-20241115145725636

image-20241115145832750

image-20241115145947677

状态

image-20241115150521157

顺序执行

image-20241115150736530

image-20241115151017584

wait和sleep

image-20241115154722556

如何终止线程

image-20241115155204426

线程安全

synchronized底层原理

image-20241116085705439

底层实现是Monitor

image-20241116090021112

image-20241116091004674

一旦锁发生竞争,都会升级为重量级锁。

JMM

image-20241116091838454

CAS

image-20241116092430536

volatile

  • 保证线程间可见性

image-20241116093135680

  • 禁止指令重排序

image-20241116093215311

image-20241116094023739

AQS

image-20241116094116845

image-20241116094652430

image-20241205162529721

AQS核⼼思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的⼯作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占⽤,那么就需要⼀套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是⽤CLH队列锁实现的,即将暂时获取不到锁的线程加⼊到队列中。

ReentrantLock

image-20241116095154783

synchronized和Lock

image-20241116100046920

死锁

image-20241116100927080

ConcurrentHashMap

image-20241116101451404

并发问题

image-20241116101745631

线程池

线程池核心参数

image-20241117190318040

image-20241117193538266

常见阻塞队列

image-20241117194306438

如何缺点核心线程数

image-20241117194847129

线程池种类

image-20241117195111040

使用场景

image-20241206162046309

JVM

JVM组成

程序计数器

image-20241117200427200

java堆

image-20241117200756694

虚拟机栈

image-20241118144631044

image-20241118144647293

方法区

image-20241118144710602

直接内存

image-20241118144737789

类装载过程

image-20241118144351326

垃圾回收

何时回收

image-20241118145115656

引用计数法可能会循环引用,造成泄露,所以现在经常用可达性分析。

垃圾回收算法

image-20241118145357265

分代回收

image-20241118145731845

引用

image-20241118174802208

内存泄露排查思路

image-20241206170711343

CPU排查思路

image-20241206170735783

企业场景

设计模式

工厂设计模式

image-20241118180800590

策略模式

image-20241118182117601

责任链模式

image-20241118182228159

技术场景

单点登录

image-20241118182524832

image-20241118182657373

权限认证

image-20241118183410591

image-20241118183419380

上传数据安全性

image-20241118183758848

棘手问题

采集日志

怎么快速定位系统瓶颈

image-20241118192428648

补充1

基础

  • 重载和重写

image-20241205221942136

  • 装箱和拆箱

image-20241205222002926

  • == equals

image-20241205222039872

  • hashCode equals

image-20241205222159699

  • Java 中的异常处理

image-20241205222228335

image-20241205222241278

image-20241205222250416

  • Java 序列化中如果有些字段不想进⾏序列化,怎么办?

对于不想进⾏序列化的变量,使⽤ transient 关键字修饰。

transient 关键字的作⽤是:阻⽌实例中那些⽤此关键字修饰的的变量序列化;当对象被反序列化时,被 transient 修饰的变量值不会被持久化和恢复。transient 只能修饰变量,不能修饰类和⽅法。

  • Java 中 IO 流

    image-20241205222418104

image-20241205222356524

image-20241207184109427

NIO:基于同步非阻塞 I/O 模型。尽管 NIO 支持非阻塞模式,但所有的 I/O 操作最终都是同步完成的。这意味着当一个 I/O 操作被发起时,它不会立即返回结果,而是需要应用程序通过轮询或选择器来检查操作是否完成。

AIO:真正的异步 I/O 模型。在 AIO 中,所有的 I/O 操作都是异步完成的。一旦发起一个 I/O 请求,该请求将立即返回,允许程序继续执行其他任务。操作系统会在 I/O 操作完成后通知应用程序。

  • 集合框架底层数据结构总结

image-20241205222540249

  • 进程和线程

image-20241205222628899

image-20241205222706738

  • 死锁

image-20241205222735487

  • 为什么我们调⽤ start() ⽅法时会执⾏run() ⽅法,为什么我们不能直接调⽤run() ⽅法

image-20241205222818227

  • 双重校验锁实现对象单例(线程安全)

image-20241205223056871

  • java对象创建过程

image-20241205223644141

image-20241205223653879

  • 对象的访问定位有哪两种⽅式?

image-20241205223725944

  • 常⻅的垃圾回收器有那些?

image-20241205223807138

Serial收集器

Serial(串⾏)收集器收集器是最基本、历史最悠久的垃圾收集器了。⼤家看名字就知道这个收集器是⼀个单线程收集器了。它的 单线程 的意义不仅仅意味着它只会使⽤⼀条垃圾收集线程去完成垃圾收集⼯作,更重要的是它在进⾏垃圾收集⼯作的时候必须暂停其他所有的⼯作线程( “Stop The World”),直到它收集结束。

ParNew收集器

ParNew收集器其实就是Serial收集器的多线程版本,除了使⽤多线程进⾏垃圾收集外,其余⾏为(控参数、收集算法、回收策略等等)和Serial收集器完全⼀样。

Parallel Scavenge收集器

image-20241205224253533

image-20241205224318164

image-20241205224329444

  • 双亲委派模型

image-20241205224433027

image-20241205224439218

  • image-20241205224622648
  • ⼀条SQL语句在MySQL中如何执⾏的

image-20241205225225043

image-20241205225340608

  • 索引实现为什么是B树而不是红黑树

image-20241206104728029

  • 受检异常和非受检异常

image-20241206105050884

  • b+而不是b

image-20241206105931070

  • 守护线程

image-20241206110838097

  • mysql索引类型

image-20241206111247698

  • springboot约定优先于配置

image-20241206112124441

image-20241206112132390

  • Spring中事务的传播行为

image-20241206114452006

image-20241206114500379

  • Dubbo如何动态感知服务下线

image-20241206114748010

image-20241206114801949

当 Dubbo 服务提供方出现故障导致 Zookeeper 剔除了这个服务的地址,

image-20241206114823628

  • Mybatis 中#{}和${}的区别是什么?

image-20241206115154486

  • 数据库连接池

数据库连接池是一种池化技术,池化技术的核心思想是实现资源的复用,避免资源重复创建销毁的开销。

而在数据库的应用场景里面,应用程序每次向数据库发起 CRUD 操作的时候,都需要创建连接在数据库访问量较大的情况下,频繁的创建连接会带来较大的性能开销。

(如图)而连接池的核心思想,就是应用程序在启动的时候提前初始化一部分连接保存到连接池里面,当应用需要使用连接的时候,直接从连接池获取一个已经建立好的链接。连接池的设计,避免了每次连接的建立和释放带来的开销。

image-20241206115308060

  • new String(“abc”)到底创建了几个对象?

image-20241206115527338

  • String、StringBuffer、StringBuilder 区别

image-20241206143347813

  • JVM年龄代为什么是15次

一个对象的GC年龄是储存在对象头里的,而对象头里有4位储存GC年龄,最大值为15。

  • 深拷贝和浅拷贝

image-20241206145323610

  • Spring IOC 和 DI

image-20241206145508721

image-20241206145518569

image-20241206145529526

  • finally块一定执行吗

image-20241206150428445

  • Integer 和 int 的区别

image-20241206151407495

  • 零拷贝
  • 在2G文件中,找出高频top100

image-20241206152107404

image-20241206152114028

  • 表数据量大的时候,影响查询效率的主要原因

image-20241206152310633

  • 两个 Integer 对象比较大小,为什么 100 等于 100,1000 不等于 1000

因为Intefer的valueOf方法,判断时如果目标值在-128-127则会直接从cache取值。

  • MQ(消息中间件)

MQ 全称是 Message Queue,直译过来叫做消息队列,主要是作为分布式应用之间实现异步通信的方式。

主要由三个部分组成,分别是生产者、消息服务端和消费者

image-20241206153425201

流量消峰

image-20241206153442652

应用解耦

image-20241206153455894

异步处理

image-20241206153529960

  • 给你 ab,ac,abc 字段,你是如何加索引 的

image-20241206154635372

  • 补充2

  • CPU飚高系统反应慢怎么排查

image-20241205162936984

  • Dubbo的服务请求失败怎么处理

image-20241205163458787

  • 如何实现Redis和Mysql的一致性

当应用程序需要去读取某个数据的时候,首先会先尝试去 Redis 里面加载,如果命中就直接返回。如果没有命中,就从数据库查询,查询到数据后再把这个数据缓存到 Redis里面。

在这种情况下,能够选择的方法只有几种。

  1. 先更新数据库,再更新缓存

  2. 先删除缓存,再更新数据库

如果先更新数据库,再更新缓存,如果缓存更新失败,就会导致数据库和 Redis 中的数据不一致。

如果是先删除缓存,再更新数据库,理想情况是应用下次访问 Redis 的时候,发现 Redis里面的数据是空的,就从数据库加载保存到 Redis 里面,那么数据是一致的。但是在极端情况下,由于删除 Redis 和更新数据库这两个操作并不是原子的,所以这个过程如果有其他线程来访问,还是会存在数据不一致问题。

所以,如果需要在极端情况下仍然保证 Redis 和 Mysql 的数据一致性,就只能采用最终一致性方案。(如图)比如基于 RocketMQ 的可靠性消息通信,来实现最终一致性。

image-20241205165736374

因为这里是基于最终一致性来实现的,如果业务场景不能接受数据的短期不一致性,那就不能使用这个方案来做。

  • Dubbo核心功能

image-20241205170107231

image-20241205170117694

  • Dubbo工作原理

image-20241205170215391

  • Mysql优化

image-20241205171151724

image-20241205171234879

image-20241205171242190

image-20241205171253143

image-20241205171303324

RPC核心代码

基础

编写基于Vert.x实现的web服务器VertxHttpServer,能够监听指定端口并处理请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.yupi.yurpc.server;

import io.vertx.core.Vertx;

public class VertxHttpServer implements HttpServer {

public void doStart(int port) {
// 创建 Vert.x 实例
Vertx vertx = Vertx.vertx();

// 创建 HTTP 服务器
io.vertx.core.http.HttpServer server = vertx.createHttpServer();

// 监听端口并处理请求
server.requestHandler(request -> {
// 处理 HTTP 请求
System.out.println("Received request: " + request.method() + " " + request.uri());

// 发送 HTTP 响应
request.response()
.putHeader("content-type", "text/plain")
.end("Hello from Vert.x HTTP server!");
});

// 启动 HTTP 服务器并监听指定端口
server.listen(port, result -> {
if (result.succeeded()) {
System.out.println("Server is now listening on port " + port);
} else {
System.err.println("Failed to start server: " + result.cause());
}
});
}
}

本地服务注册中心

image-20241213144820474

image-20241213144835523

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.yupi.yurpc.registry;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* 本地注册中心
*/
public class LocalRegistry {

/**
* 注册信息存储
*/
private static final Map<String, Class<?>> map = new ConcurrentHashMap<>();

/**
* 注册服务
*
* @param serviceName
* @param implClass
*/
public static void register(String serviceName, Class<?> implClass) {
map.put(serviceName, implClass);
}

/**
* 获取服务
*
* @param serviceName
* @return
*/
public static Class<?> get(String serviceName) {
return map.get(serviceName);
}

/**
* 删除服务
*
* @param serviceName
*/
public static void remove(String serviceName) {
map.remove(serviceName);
}
}

请求处理器

image-20241213144959372

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.yupi.yurpc.server;

import com.yupi.yurpc.model.RpcRequest;
import com.yupi.yurpc.model.RpcResponse;
import com.yupi.yurpc.registry.LocalRegistry;
import com.yupi.yurpc.serializer.JdkSerializer;
import com.yupi.yurpc.serializer.Serializer;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;

import java.io.IOException;
import java.lang.reflect.Method;

/**
* HTTP 请求处理
*/
public class HttpServerHandler implements Handler<HttpServerRequest> {

@Override
public void handle(HttpServerRequest request) {
// 指定序列化器
final Serializer serializer = new JdkSerializer();

// 记录日志
System.out.println("Received request: " + request.method() + " " + request.uri());

// 异步处理 HTTP 请求
request.bodyHandler(body -> {
byte[] bytes = body.getBytes();
RpcRequest rpcRequest = null;
try {
rpcRequest = serializer.deserialize(bytes, RpcRequest.class);
} catch (Exception e) {
e.printStackTrace();
}

// 构造响应结果对象
RpcResponse rpcResponse = new RpcResponse();
// 如果请求为 null,直接返回
if (rpcRequest == null) {
rpcResponse.setMessage("rpcRequest is null");
doResponse(request, rpcResponse, serializer);
return;
}

try {
// 获取要调用的服务实现类,通过反射调用
Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
// 封装返回结果
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType());
rpcResponse.setMessage("ok");
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setMessage(e.getMessage());
rpcResponse.setException(e);
}
// 响应
doResponse(request, rpcResponse, serializer);
});
}

/**
* 响应
*
* @param request
* @param rpcResponse
* @param serializer
*/
void doResponse(HttpServerRequest request, RpcResponse rpcResponse, Serializer serializer) {
HttpServerResponse httpServerResponse = request.response()
.putHeader("content-type", "application/json");
try {
// 序列化
byte[] serialized = serializer.serialize(rpcResponse);
httpServerResponse.end(Buffer.buffer(serialized));
} catch (IOException e) {
e.printStackTrace();
httpServerResponse.end(Buffer.buffer());
}
}
}

TCP版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.yupi.yurpc.server.tcp;

import com.yupi.yurpc.model.RpcRequest;
import com.yupi.yurpc.model.RpcResponse;
import com.yupi.yurpc.protocol.ProtocolMessage;
import com.yupi.yurpc.protocol.ProtocolMessageDecoder;
import com.yupi.yurpc.protocol.ProtocolMessageEncoder;
import com.yupi.yurpc.protocol.ProtocolMessageTypeEnum;
import com.yupi.yurpc.registry.LocalRegistry;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;

import java.io.IOException;
import java.lang.reflect.Method;

public class TcpServerHandler implements Handler<NetSocket> {

@Override
public void handle(NetSocket netSocket) {
// 处理连接
netSocket.handler(buffer -> {
// 接受请求,解码
ProtocolMessage<RpcRequest> protocolMessage;
try {
protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);
} catch (IOException e) {
throw new RuntimeException("协议消息解码错误");
}
RpcRequest rpcRequest = protocolMessage.getBody();

// 处理请求
// 构造响应结果对象
RpcResponse rpcResponse = new RpcResponse();
try {
// 获取要调用的服务实现类,通过反射调用
Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
// 封装返回结果
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType());
rpcResponse.setMessage("ok");
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setMessage(e.getMessage());
rpcResponse.setException(e);
}

// 发送响应,编码
ProtocolMessage.Header header = protocolMessage.getHeader();
header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());
ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);
try {
Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);
netSocket.write(encode);
} catch (IOException e) {
throw new RuntimeException("协议消息编码错误");
}
});
}
}

请求发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.yupi.yurpc.proxy;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.yupi.yurpc.RpcApplication;
import com.yupi.yurpc.config.RpcConfig;
import com.yupi.yurpc.constant.RpcConstant;
import com.yupi.yurpc.model.RpcRequest;
import com.yupi.yurpc.model.RpcResponse;
import com.yupi.yurpc.model.ServiceMetaInfo;
import com.yupi.yurpc.protocol.*;
import com.yupi.yurpc.registry.Registry;
import com.yupi.yurpc.registry.RegistryFactory;
import com.yupi.yurpc.serializer.Serializer;
import com.yupi.yurpc.serializer.SerializerFactory;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;

/**
* 服务代理(JDK 动态代理)
*
* @author <a href="https://github.com/liyupi">程序员鱼皮</a>
* @learn <a href="https://codefather.cn">编程宝典</a>
* @from <a href="https://yupi.icu">编程导航知识星球</a>
*/
public class ServiceProxy implements InvocationHandler {

/**
* 调用代理
*
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 指定序列化器
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());

// 构造请求
String serviceName = method.getDeclaringClass().getName();
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(serviceName)
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();
try {
// 序列化
byte[] bodyBytes = serializer.serialize(rpcRequest);
// 从注册中心获取服务提供者请求地址
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
if (CollUtil.isEmpty(serviceMetaInfoList)) {
throw new RuntimeException("暂无服务地址");
}
ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);
// 发送 TCP 请求
Vertx vertx = Vertx.vertx();
NetClient netClient = vertx.createNetClient();
CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
netClient.connect(selectedServiceMetaInfo.getServicePort(), selectedServiceMetaInfo.getServiceHost(),
result -> {
if (result.succeeded()) {
System.out.println("Connected to TCP server");
io.vertx.core.net.NetSocket socket = result.result();
// 发送数据
// 构造消息
ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey());
header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
header.setRequestId(IdUtil.getSnowflakeNextId());
protocolMessage.setHeader(header);
protocolMessage.setBody(rpcRequest);
// 编码请求
try {
Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
socket.write(encodeBuffer);
} catch (IOException e) {
throw new RuntimeException("协议消息编码错误");
}

// 接收响应
socket.handler(buffer -> {
try {
ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);
responseFuture.complete(rpcResponseProtocolMessage.getBody());
} catch (IOException e) {
throw new RuntimeException("协议消息解码错误");
}
});
} else {
System.err.println("Failed to connect to TCP server");
}
});

RpcResponse rpcResponse = responseFuture.get();
// 记得关闭连接
netClient.close();
return rpcResponse.getData();
} catch (IOException e) {
e.printStackTrace();
}

return null;
}
}

序列化器

序列化器JSON

JSON序列化器需要考虑对象转换的兼容问题,主要是因为Java语言中的泛型擦除(Type Erasure)机制和JSON数据格式本身的特性。

  1. 泛型擦除: 在Java中,泛型信息在编译后会被擦除,这意味着运行时无法直接获取泛型参数的具体类型信息。例如,List<String>List<Integer> 在运行时都被视为 List 类型。当使用Jackson等库进行反序列化时,默认情况下它们可能不知道如何将JSON对象映射回原始的Java泛型类型,可能会默认返回如 LinkedHashMap 这样的类型。
  2. JSON与Java对象模型差异: JSON是一种轻量级的数据交换格式,它没有像Java那样的复杂类型系统。JSON只支持几种基本类型(字符串、数字、布尔值、数组、对象和null)。因此,当从JSON反序列化到Java对象时,有时候JSON结构并不能完全对应Java类的结构,特别是对于复杂的嵌套对象或自定义类型。

Kryo 是一个专门为Java设计的高效序列化框架,它与JSON序列化器如Jackson相比,在处理对象转换兼容性问题上有不同的特性,这使得Kryo在某些情况下不需要特别考虑类型转换的兼容问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.yupi.yurpc.serializer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.yupi.yurpc.model.RpcRequest;
import com.yupi.yurpc.model.RpcResponse;

import java.io.IOException;


public class JsonSerializer implements Serializer {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Override
public <T> byte[] serialize(T obj) throws IOException {
return OBJECT_MAPPER.writeValueAsBytes(obj);
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> classType) throws IOException {
T obj = OBJECT_MAPPER.readValue(bytes, classType);
if (obj instanceof RpcRequest) {
return handleRequest((RpcRequest) obj, classType);
}
if (obj instanceof RpcResponse) {
return handleResponse((RpcResponse) obj, classType);
}
return obj;
}

/**
* 由于 Object 的原始对象会被擦除,导致反序列化时会被作为 LinkedHashMap 无法转换成原始对象,因此这里做了特殊处理
*
* @param rpcRequest rpc 请求
* @param type 类型
* @return {@link T}
* @throws IOException IO异常
*/
private <T> T handleRequest(RpcRequest rpcRequest, Class<T> type) throws IOException {
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
Object[] args = rpcRequest.getArgs();

// 循环处理每个参数的类型
for (int i = 0; i < parameterTypes.length; i++) {
Class<?> clazz = parameterTypes[i];
// 如果类型不同,则重新处理一下类型
if (!clazz.isAssignableFrom(args[i].getClass())) {
byte[] argBytes = OBJECT_MAPPER.writeValueAsBytes(args[i]);
args[i] = OBJECT_MAPPER.readValue(argBytes, clazz);
}
}
return type.cast(rpcRequest);
}

/**
* 由于 Object 的原始对象会被擦除,导致反序列化时会被作为 LinkedHashMap 无法转换成原始对象,因此这里做了特殊处理
*
* @param rpcResponse rpc 响应
* @param type 类型
* @return {@link T}
* @throws IOException IO异常
*/
private <T> T handleResponse(RpcResponse rpcResponse, Class<T> type) throws IOException {
// 处理响应数据
byte[] dataBytes = OBJECT_MAPPER.writeValueAsBytes(rpcResponse.getData());
rpcResponse.setData(OBJECT_MAPPER.readValue(dataBytes, rpcResponse.getDataType()));
return type.cast(rpcResponse);
}
}

kryo序列器

image-20241213150853882

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.yupi.yurpc.serializer;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

/**
* Kryo 序列化器
*
* @author <a href="https://github.com/liyupi">程序员鱼皮</a>
* @learn <a href="https://codefather.cn">编程宝典</a>
* @from <a href="https://yupi.icu">编程导航知识星球</a>
*/
public class KryoSerializer implements Serializer {
/**
* kryo 线程不安全,使用 ThreadLocal 保证每个线程只有一个 Kryo
*/
private static final ThreadLocal<Kryo> KRYO_THREAD_LOCAL = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
// 设置动态动态序列化和反序列化类,不提前注册所有类(可能有安全问题)
kryo.setRegistrationRequired(false);
return kryo;
});

@Override
public <T> byte[] serialize(T obj) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream);
KRYO_THREAD_LOCAL.get().writeObject(output, obj);
output.close();
return byteArrayOutputStream.toByteArray();
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> classType) {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream);
T result = KRYO_THREAD_LOCAL.get().readObject(input, classType);
input.close();
return result;
}
}

hessian

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.yupi.yurpc.serializer;

import com.caucho.hessian.io.HessianInput;
import com.caucho.hessian.io.HessianOutput;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

/**
* Hessian 序列化器
*
* @author <a href="https://github.com/liyupi">程序员鱼皮</a>
* @learn <a href="https://codefather.cn">编程宝典</a>
* @from <a href="https://yupi.icu">编程导航知识星球</a>
*/
public class HessianSerializer implements Serializer {
@Override
public <T> byte[] serialize(T object) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
HessianOutput ho = new HessianOutput(bos);
ho.writeObject(object);
return bos.toByteArray();
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> tClass) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
HessianInput hi = new HessianInput(bis);
return (T) hi.readObject(tClass);
}
}

注册中心

  • 由于一个服务可能有多个提供者,有两种设计结构

image-20241213151432331

image-20241213151441791

image-20241213151453952

对于ZooKeeper和Etcd这种支持层级查询的中间件,用第一种更加清晰。

注册信息定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.yupi.yurpc.model;

/**
* 服务元信息(注册信息)
*/
public class ServiceMetaInfo {


/**
* 服务名称
*/
private String serviceName;

/**
* 服务版本号
*/
private String serviceVersion = "1.0";

/**
* 服务域名
*/
private String serviceHost;

/**
* 服务端口号
*/
private Integer servicePort;

/**
* 服务分组(暂未实现)
*/
private String serviceGroup = "default";

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 获取服务键名
*
* @return
*/
public String getServiceKey() {
// 后续可扩展服务分组
// return String.format("%s:%s:%s", serviceName, serviceVersion, serviceGroup);
return String.format("%s:%s", serviceName, serviceVersion);
}

/**
* 获取服务注册节点键名
*
* @return
*/
public String getServiceNodeKey() {
return String.format("%s/%s:%s", getServiceKey(), serviceHost, servicePort);
}

注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
// 创建 Lease 和 KV 客户端
Lease leaseClient = client.getLeaseClient();

// 创建一个 30 秒的租约
long leaseId = leaseClient.grant(30).get().getID();

// 设置要存储的键值对
String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);
ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);

// 将键值对与租约关联起来,并设置过期时间
PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
kvClient.put(key, value, putOption).get();
}

假设有一个服务实例:

  • serviceName: “my-service”
  • serviceVersion: “1.0” (默认值)
  • serviceHost: “192.168.1.10”
  • servicePort: 8080

image-20241213152013472

服务发现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
// 前缀搜索,结尾一定要加 '/'
String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";

try {
// 前缀查询
GetOption getOption = GetOption.builder().isPrefix(true).build();
List<KeyValue> keyValues = kvClient.get(
ByteSequence.from(searchPrefix, StandardCharsets.UTF_8),
getOption)
.get()
.getKvs();
// 解析服务信息
return keyValues.stream()
.map(keyValue -> {
String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
return JSONUtil.toBean(value, ServiceMetaInfo.class);
})
.collect(Collectors.toList());
} catch (Exception e) {
throw new RuntimeException("获取服务列表失败", e);
}
}

这段代码实现了基于 etcd 的服务发现功能,具体步骤如下:

  1. 构建查询前缀:根据给定的服务键构建一个特定的路径前缀。
  2. 配置查询选项:设置查询条件为前缀匹配。
  3. 执行查询:向 etcd 发起查询请求,获取所有匹配的服务实例。
  4. 解析结果:将查询结果中的每个键值对的值部分(JSON 字符串)反序列化为 ServiceMetaInfo 对象。
  5. 返回结果:将所有解析后的 ServiceMetaInfo 对象收集到一个列表中并返回。
  6. 异常处理:确保在出现错误时能够适当处理,并提供有用的错误信息。

心跳检测和续期机制

image-20241213152937020

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public void heartBeat() {
// 10 秒续签一次
CronUtil.schedule("*/10 * * * * *", new Task() {
@Override
public void execute() {
// 遍历本节点所有的 key
for (String key : localRegisterNodeKeySet) {
try {
List<KeyValue> keyValues = kvClient.get(ByteSequence.from(key, StandardCharsets.UTF_8))
.get()
.getKvs();
// 该节点已过期(需要重启节点才能重新注册)
if (CollUtil.isEmpty(keyValues)) {
continue;
}
// 节点未过期,重新注册(相当于续签)
KeyValue keyValue = keyValues.get(0);
String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
ServiceMetaInfo serviceMetaInfo = JSONUtil.toBean(value, ServiceMetaInfo.class);
register(serviceMetaInfo);
} catch (Exception e) {
throw new RuntimeException(key + "续签失败", e);
}
}
}
});

// 支持秒级别定时任务
CronUtil.setMatchSecond(true);
CronUtil.start();
}

利用重新注册实现续签,定时任务是通过 CronUtil 类来完成的,来自于Hutool

服务节点下线

image-20241213154523357

image-20241213154745096

本地缓存,用一个列表来实现

image-20241213155234761

image-20241213155313057

image-20241213155336978

自定义协议

自定义协议后TCP服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.yupi.yurpc.server.tcp;

import com.yupi.yurpc.server.HttpServer;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;

public class VertxTcpServer implements HttpServer {

private byte[] handleRequest(byte[] requestData) {
// 在这里编写处理请求的逻辑,根据 requestData 构造响应数据并返回
// 这里只是一个示例,实际逻辑需要根据具体的业务需求来实现
return "Hello, client!".getBytes();
}

@Override
public void doStart(int port) {
// 创建 Vert.x 实例
Vertx vertx = Vertx.vertx();

// 创建 TCP 服务器
NetServer server = vertx.createNetServer();

// 处理请求
server.connectHandler(socket -> {
// 处理连接
socket.handler(buffer -> {
// 处理接收到的字节数组
byte[] requestData = buffer.getBytes();
// 在这里进行自定义的字节数组处理逻辑,比如解析请求、调用服务、构造响应等
byte[] responseData = handleRequest(requestData);
// 发送响应
socket.write(Buffer.buffer(responseData));
});
});

// 启动 TCP 服务器并监听指定端口
server.listen(port, result -> {
if (result.succeeded()) {
System.out.println("TCP server started on port " + port);
} else {
System.err.println("Failed to start TCP server: " + result.cause());
}
});
}

public static void main(String[] args) {
new VertxTcpServer().doStart(8888);
}
}

TCP客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.yupi.yurpc.server.tcp;

import io.vertx.core.Vertx;

public class VertxTcpClient {

public void start() {
// 创建 Vert.x 实例
Vertx vertx = Vertx.vertx();

vertx.createNetClient().connect(8888, "localhost", result -> {
if (result.succeeded()) {
System.out.println("Connected to TCP server");
io.vertx.core.net.NetSocket socket = result.result();
// 发送数据
socket.write("Hello, server!");
// 接收响应
socket.handler(buffer -> {
System.out.println("Received response from server: " + buffer.toString());
});
} else {
System.err.println("Failed to connect to TCP server");
}
});
}

public static void main(String[] args) {
new VertxTcpClient().start();
}
}

消息编码器与解码器

image-20241213155702105

半包粘包

image-20241213160311195

RecordParse的作用是:保证下次读到特定长度的字符。

  • image-20241213160435877

image-20241213160443518

负载均衡

轮询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.yupi.yurpc.loadbalancer;

import com.yupi.yurpc.model.ServiceMetaInfo;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 轮询负载均衡器
*
* @author <a href="https://github.com/liyupi">程序员鱼皮</a>
* @learn <a href="https://codefather.cn">鱼皮的编程宝典</a>
* @from <a href="https://yupi.icu">编程导航学习圈</a>
*/
public class RoundRobinLoadBalancer implements LoadBalancer {

/**
* 当前轮询的下标
*/
private final AtomicInteger currentIndex = new AtomicInteger(0);

@Override
public ServiceMetaInfo select(Map<String, Object> requestParams, List<ServiceMetaInfo> serviceMetaInfoList) {
if (serviceMetaInfoList.isEmpty()) {
return null;
}
// 只有一个服务,无需轮询
int size = serviceMetaInfoList.size();
if (size == 1) {
return serviceMetaInfoList.get(0);
}
// 取模算法轮询
int index = currentIndex.getAndIncrement() % size;
return serviceMetaInfoList.get(index);
}
}

随机

1
2
3
4
5
6
7
8

// 只有 1 个服务,不用随机
if (size == 1) {
return serviceMetaInfoList.get(0);
}
return serviceMetaInfoList.get(random.nextInt(size));


一致性Hash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.yupi.yurpc.loadbalancer;

import com.yupi.yurpc.model.ServiceMetaInfo;

import java.util.List;
import java.util.Map;
import java.util.TreeMap;

/**
* 一致性哈希负载均衡器
*
* @author <a href="https://github.com/liyupi">程序员鱼皮</a>
* @learn <a href="https://codefather.cn">鱼皮的编程宝典</a>
* @from <a href="https://yupi.icu">编程导航学习圈</a>
*/
public class ConsistentHashLoadBalancer implements LoadBalancer {

/**
* 一致性 Hash 环,存放虚拟节点
*/
private final TreeMap<Integer, ServiceMetaInfo> virtualNodes = new TreeMap<>();

/**
* 虚拟节点数
*/
private static final int VIRTUAL_NODE_NUM = 100;

@Override
public ServiceMetaInfo select(Map<String, Object> requestParams, List<ServiceMetaInfo> serviceMetaInfoList) {
if (serviceMetaInfoList.isEmpty()) {
return null;
}

// 构建虚拟节点环
for (ServiceMetaInfo serviceMetaInfo : serviceMetaInfoList) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
int hash = getHash(serviceMetaInfo.getServiceAddress() + "#" + i);
virtualNodes.put(hash, serviceMetaInfo);
}
}

// 获取调用请求的 hash 值
int hash = getHash(requestParams);

// 选择最接近且大于等于调用请求 hash 值的虚拟节点
Map.Entry<Integer, ServiceMetaInfo> entry = virtualNodes.ceilingEntry(hash);
if (entry == null) {
// 如果没有大于等于调用请求 hash 值的虚拟节点,则返回环首部的节点
entry = virtualNodes.firstEntry();
}
return entry.getValue();
}


/**
* Hash 算法,可自行实现
*
* @param key
* @return
*/
private int getHash(Object key) {
return key.hashCode();
}
}

重试机制

不重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.yupi.yurpc.fault.retry;

import com.yupi.yurpc.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;

/**
* 不重试 - 重试策略
*
* @author <a href="https://github.com/liyupi">程序员鱼皮</a>
* @learn <a href="https://codefather.cn">鱼皮的编程宝典</a>
* @from <a href="https://yupi.icu">编程导航学习圈</a>
*/
@Slf4j
public class NoRetryStrategy implements RetryStrategy {

/**
* 重试
*
* @param callable
* @return
* @throws Exception
*/
public RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception {
return callable.call();
}

}

固定时间重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.yupi.yurpc.fault.retry;

import com.github.rholder.retry.*;
import com.yupi.yurpc.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* 固定时间间隔 - 重试策略
*
* @author <a href="https://github.com/liyupi">程序员鱼皮</a>
* @learn <a href="https://codefather.cn">鱼皮的编程宝典</a>
* @from <a href="https://yupi.icu">编程导航学习圈</a>
*/
@Slf4j
public class FixedIntervalRetryStrategy implements RetryStrategy {

/**
* 重试
*
* @param callable
* @return
* @throws ExecutionException
* @throws RetryException
*/
public RpcResponse doRetry(Callable<RpcResponse> callable) throws ExecutionException, RetryException {
Retryer<RpcResponse> retryer = RetryerBuilder.<RpcResponse>newBuilder()
.retryIfExceptionOfType(Exception.class)
.withWaitStrategy(WaitStrategies.fixedWait(3L, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
log.info("重试次数 {}", attempt.getAttemptNumber());
}
})
.build();
return retryer.call(callable);
}

}

image-20241213161255244

容错机制

image-20241213161344412

image-20241213161349841