使用Netty+Nacos+Protobuf制作RPC框架

使用Netty+Nacos+Protobuf制作RPC框架

简介

显现的功能

这个RPC实现了一些基本的功能:

  • 使用Netty来进行网络传输,效率比起传统的NIO要高很多。
  • 使用单例模式,在Netty获取Channel的过程中,会有一个ChannelProvider去提供Channel单例。
  • 使用Nacos作为服务的注册中心,用于管理注册的服务,当客户端请求发过来时,Nacos会寻找合适的服务返回给客户端消费。
  • 实现了负载均衡的功能,,客户端对于Nacos返回的服务列表,会使用负载均衡算法,选择一个自己需要的服务加入,目前实现了轮询算法和随机选取算法。
  • 加入了心跳检测机制,并不会发送完消息立即结束,而是保持的长连接,提高效率。
  • 使用Potobuf作为对象的的序列化工具,实现Netty中的编/解码的功能,提高了效率。
  • 实现了钩子函数,当服务端下线的时候会自动去Nacos注销服务。
  • 使用CompletableFuture来接受客户端返回的结果。

测试

由于使用Nacos,调试比较简单:
下载好Nacos,无论是win版还是linux版,在官网都有,比较方便;
但是由于Nacos一般都要配置数据库,为了方便测试,可以使用命令先进行单机运行

1
startup.cmd -m standalone

客户端:

1
2
3
4
5
6
7
8
9
10
public class NettyTestClient {
public static void main(String[] args) {
RpcClient client = new NettyClient(CommonSerializer.PROTOBUF_SERIALIZER);
RpcClientProxy rpcClientProxy = new RpcClientProxy(client);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
HelloObject object = new HelloObject(114514, "Client send a Message");
String res = helloService.hello(object);
System.out.println(res);
}
}

服务端:

1
2
3
4
5
6
7
8
9
@ServiceScan
public class NettyTestServer {

public static void main(String[] args) {
RpcServer server = new NettyServer("127.0.0.1", 9999, CommonSerializer.PROTOBUF_SERIALIZER);
server.start();
}

}

之后会有一个测试结果:
客户端收到信息
image.png
服务端收到信息
image.png

服务端分析

首先服务端都会实现一个接口:

1
2
3
4
5
6
7
8
9
public interface RpcServer {

int DEFAULT_SERIALIZER = CommonSerializer.PROTOBUF_SERIALIZER;

void start();

<T> void publishService(T service, String serviceName);

}

这个接口通常定义了默认的序列化方法,开始方法,和发布服务的方法。
接着会有一个抽象类去实现这个接口:

AbstractRpcServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public abstract class AbstractRpcServer implements RpcServer {

protected Logger logger = LoggerFactory.getLogger(this.getClass());

protected String host;
protected int port;

protected ServiceRegistry serviceRegistry;
protected ServiceProvider serviceProvider;

public void scanServices() {
String mainClassName = ReflectUtil.getStackTrace();
Class<?> startClass;
try {
startClass = Class.forName(mainClassName);
if(!startClass.isAnnotationPresent(ServiceScan.class)) {
logger.error("启动类缺少 @ServiceScan 注解");
throw new RpcException(RpcError.SERVICE_SCAN_PACKAGE_NOT_FOUND);
}
} catch (ClassNotFoundException e) {
logger.error("出现未知错误");
throw new RpcException(RpcError.UNKNOWN_ERROR);
}
String basePackage = startClass.getAnnotation(ServiceScan.class).value();
if("".equals(basePackage)) {
basePackage = mainClassName.substring(0, mainClassName.lastIndexOf("."));
}
Set<Class<?>> classSet = ReflectUtil.getClasses(basePackage);
for(Class<?> clazz : classSet) {
if(clazz.isAnnotationPresent(Service.class)) {
String serviceName = clazz.getAnnotation(Service.class).name();
Object obj;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
logger.error("创建 " + clazz + " 时有错误发生");
continue;
}
if("".equals(serviceName)) {
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> oneInterface: interfaces){
publishService(obj, oneInterface.getCanonicalName());
}
} else {
publishService(obj, serviceName);
/*
这段代码是在判断 Service 注解中的 name 属性是否为空,如果为空,
则说明该服务实现类实现了多个接口,并且需要将每个接口都发布成一个独立的服务。
所以,代码通过获取该服务实现类的所有接口,然后将每个接口都作为一个独立的服务进行发布。
如果 name 属性不为空,则说明只需要将该服务实现类作为一个服务进行发布。
此时,代码直接将该服务实现类作为一个服务进行发布。
*/
}
}
}
}

@Override
public <T> void publishService(T service, String serviceName) {
serviceProvider.addServiceProvider(service, serviceName);
serviceRegistry.register(serviceName, new InetSocketAddress(host, port));
}

}

scanServices 的作用:这段代码是服务扫描的核心实现,它通过获取启动类的信息,获取服务扫描的基础包路径,然后扫描该路径下的所有类,判断是否有@Service注解,如果有,就将该服务发布到注册中心。
具体的实现流程如下:

  1. 调用ReflectUtil.getStackTrace()方法获取当前方法调用的栈信息,得到启动类的全限定名。
  2. 使用Class.forName()方法加载启动类,判断启动类是否被@ServiceScan注解所标注,如果没有则抛出异常。
  3. 获取@ServiceScan注解的参数值,即基础包路径。
  4. 调用ReflectUtil.getClasses()方法获取指定包下的所有类,遍历这些类,判断是否被@Service注解所标注。
  5. 如果被@Service注解所标注,则获取@Service注解的参数值,即服务名称,如果未指定服务名称,则获取该服务实现类实现的所有接口,并将该服务发布到注册中心。
  6. 如果指定了服务名称,则直接将该服务发布到注册中心。

该方法主要的功能就是扫描服务,将服务发布到注册中心,为后续的服务调用提供依据
接着我们来看看这个所需要的注解:

1
2
3
4
5
6
7
8
9
10
/**
* 表示一个服务提供类,用于远程接口的实现类
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Service {

public String name() default "";

}
1
2
3
4
5
6
7
8
9
10
/**
* 服务扫描的基包
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ServiceScan {

public String value() default "";

}

而上述提到的ReflectUtil这个类是一个比较常见的类,这个类是一个工具类,提供了两个静态方法:getStackTrace(): 返回当前调用栈顶部的类名。利用Java的反射机制,调用 new Throwable().getStackTrace() 方法获取当前调用栈信息,返回调用栈顶部的类名。
getClasses(String packageName): 获取指定包名下所有的类。该方法通过输入一个包名,然后通过反射机制查找该包下的所有类,返回一个 Set<Class<?>> 对象。该方法实现的过程比较复杂,具体过程为:

  1. 通过 Thread.currentThread().getContextClassLoader().getResources(packageDirName) 获取指定包名下的所有资源。
  2. 遍历所有资源,如果是文件,则通过递归方式查找该文件夹下所有的类,并将类对象添加到 Set<Class<?>> 集合中。
  3. 如果是 Jar 包,则通过 JarFile 对象查找该 Jar 包中的所有类,并将类对象添加到 Set<Class<?>> 集合中。

该工具类可以用于类加载器等需要动态加载类的场景,例如 Spring 框架中的 Bean 加载、RPC 框架中的服务注册等

NettyServer

接着便是这个RPC服务端的逻辑实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class NettyServer extends AbstractRpcServer {
//同时也继承了serviceRegistry和serviceProvider

private final CommonSerializer serializer;

public NettyServer(String host, int port) {
this(host, port, DEFAULT_SERIALIZER);
}

public NettyServer(String host, int port, Integer serializer) {
this.host = host;
this.port = port;
serviceRegistry = new NacosServiceRegistry();
serviceProvider = new ServiceProviderImpl();
this.serializer = CommonSerializer.getByCode(serializer);
scanServices();
}

@Override
public void start() {
ShutdownHook.getShutdownHook().addClearAllHook();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 256)
.option(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS))
.addLast(new CommonEncoder(serializer))
.addLast(new CommonDecoder())
.addLast(new NettyServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(host, port).sync();
future.channel().closeFuture().sync();

} catch (InterruptedException e) {
logger.error("启动服务器时有错误发生: ", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

}

这个类实现了一个基于Netty框架的RPC服务器,它继承了抽象类AbstractRpcServer,并且拥有服务注册表(serviceRegistry)和服务提供者(serviceProvider)的实例。
在构造函数中,传入了服务器的主机名和端口号,以及序列化器(serializer)。在构造函数中还调用了扫描服务的方法scanServices()。
在start()方法中,创建了两个EventLoopGroup,用于处理连接和IO的事件。然后使用ServerBootstrap创建了一个服务端的引导类,通过设置一系列的选项和处理器来配置Netty服务器。其中,ChannelInitializer是一个特殊的处理器,用于在Channel被创建时执行一些初始化操作。在这个ChannelInitializer中,注册了一个IdleStateHandler用于处理空闲连接,以及自定义的编解码器和处理器。
最后,通过调用bind()方法绑定主机名和端口号,并且调用sync()方法等待服务器启动完成。当关闭服务器时,调用shutdownGracefully()方法优雅地关闭EventLoopGroup。
这就是这个类的大体流程,接下来,可以一个个分开来看
其中,NacosServiceRegistry会返回一个服务注册器实例,但是这个实例实际上会调用:

1
2
3
4
5
6
7
8
9
@Override
public void register(String serviceName, InetSocketAddress inetSocketAddress) {
try {
NacosUtil.registerService(serviceName, inetSocketAddress);
} catch (NacosException e) {
logger.error("注册服务时有错误发生:", e);
throw new RpcException(RpcError.REGISTER_SERVICE_FAILED);
}
}

也就是NacosUtil去实现服务注册:

NacosUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class NacosUtil {

private static final Logger logger = LoggerFactory.getLogger(NacosUtil.class);

private static final NamingService namingService;
private static final Set<String> serviceNames = new HashSet<>();
private static InetSocketAddress address;

private static final String SERVER_ADDR = "127.0.0.1:8848";

static {
namingService = getNacosNamingService();
}

public static NamingService getNacosNamingService() {
try {
return NamingFactory.createNamingService(SERVER_ADDR);
} catch (NacosException e) {
logger.error("连接到Nacos时有错误发生: ", e);
throw new RpcException(RpcError.FAILED_TO_CONNECT_TO_SERVICE_REGISTRY);
}
}

public static void registerService(String serviceName, InetSocketAddress address) throws NacosException {
namingService.registerInstance(serviceName, address.getHostName(), address.getPort());
NacosUtil.address = address;
serviceNames.add(serviceName);

}

public static List<Instance> getAllInstance(String serviceName) throws NacosException {
return namingService.getAllInstances(serviceName);
}

public static void clearRegistry() {
if(!serviceNames.isEmpty() && address != null) {
String host = address.getHostName();
int port = address.getPort();
Iterator<String> iterator = serviceNames.iterator();
while(iterator.hasNext()) {
String serviceName = iterator.next();
try {
namingService.deregisterInstance(serviceName, host, port);
} catch (NacosException e) {
logger.error("注销服务 {} 失败", serviceName, e);
}
}
}
}
}

这是一个Nacos工具类,用于连接到Nacos服务注册中心并与之进行交互。主要包含以下几个方法:

  1. getNacosNamingService()方法用于获取NacosNamingService实例。
  2. registerService(String serviceName, InetSocketAddress address)方法用于向Nacos注册服务实例,即将提供服务的服务地址和端口注册到Nacos中,以便客户端可以通过服务名称查找到该服务。
  3. getAllInstance(String serviceName)方法用于获取指定服务名称下的所有服务实例,返回一个Instance列表。
  4. clearRegistry()方法用于清空注册中心中注册的服务实例,即将服务注销。

通过这个工具类,我们可以将服务注册到Nacos服务注册中心,并通过Nacos中心来查找并获取服务实例,以便客户端可以通过服务名称调用相应的服务。

ServiceProviderImpl

而ServiceProviderImpl默认的服务注册表,保存服务端本地服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ServiceProviderImpl implements ServiceProvider {

private static final Logger logger = LoggerFactory.getLogger(ServiceProviderImpl.class);

private static final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
private static final Set<String> registeredService = ConcurrentHashMap.newKeySet();

@Override
public <T> void addServiceProvider(T service, String serviceName) {
if (registeredService.contains(serviceName)) return;
registeredService.add(serviceName);
serviceMap.put(serviceName, service);
logger.info("向接口: {} 注册服务: {}", service.getClass().getInterfaces(), serviceName);
}

@Override
public Object getServiceProvider(String serviceName) {
Object service = serviceMap.get(serviceName);
if (service == null) {
throw new RpcException(RpcError.SERVICE_NOT_FOUND);
}
return service;
}
}

这是一个服务提供者的默认实现类,它实现了 ServiceProvider 接口中的方法,可以将提供者实例添加到服务注册表中,提供了一种方便地访问服务的方式。在服务注册表中,服务名与服务实例之间的映射关系是使用 ConcurrentHashMap 实现的。它有两个主要方法:

  1. addServiceProvider(T service, String serviceName):将服务提供者添加到服务注册表中,当服务名已经存在于注册表中时,则不进行任何操作。
  2. getServiceProvider(String serviceName):根据服务名从服务注册表中获取相应的服务实例。如果服务不存在,则抛出 RpcException 异常,表示未找到服务。

ShutdownHook

当一个服务开启之后,也会开始一个钩子函数,它的Runtime类会使用getRuntime().addShutdownHook()方法,在服务结束前,注销掉所有的在Nacos的服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ShutdownHook {

private static final Logger logger = LoggerFactory.getLogger(ShutdownHook.class);

private static final ShutdownHook shutdownHook = new ShutdownHook();

public static ShutdownHook getShutdownHook() {
return shutdownHook;
}

public void addClearAllHook() {
logger.info("关闭后将自动注销所有服务");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
NacosUtil.clearRegistry();
ThreadPoolFactory.shutDownAll();
}));
}

}

这个类是一个单例,它提供了一个静态方法getShutdownHook()来获取一个实例。它注册了一个JVM shutdown hook,该hook会在JVM关闭前被执行,清除所有注册到Nacos服务注册中心上的服务和所有线程池。这个类的作用是确保在JVM关闭前执行清除操作,避免可能的资源泄漏和数据一致性问题。

心跳检测

这里在SocketChannel中加入了一个IdleStateHandler,使其具有心跳检测功能
在 Netty 中,IdleStateHandler 是一个用于处理空闲状态的处理器。它可以在 Channel 上检测特定类型的空闲时间,并在这些时间段内未发生读取、写入或读写事件时触发相应的事件。常用的空闲状态类型有三种:READER_IDLE,WRITER_IDLE 和 ALL_IDLE。
IdleStateHandler 可以用于实现心跳机制,可以通过配置空闲时间间隔和触发事件来判断是否需要发送心跳包。它可以被添加到 Netty 的 ChannelPipeline 中,以监视 Channel 上的空闲事件,以便可以采取适当的措施,例如关闭连接或发送心跳消息。

NettyServerHandler

这里也同时添加了一个NettyServerHandler去处理在Channel中发生的时间。
先拿上面的心跳检测为例子,如果收到了一个心跳包,便会打印一条日志,告诉服务端收到了。而在长时间没有收到心跳包后,则会关闭上下文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> {

private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private final RequestHandler requestHandler;

public NettyServerHandler() {
this.requestHandler = SingletonFactory.getInstance(RequestHandler.class);
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
try {
if(msg.getHeartBeat()) {
logger.info("接收到客户端心跳包...");
return;
}
logger.info("服务器接收到请求: {}", msg);
Object result = requestHandler.handle(msg);
if (ctx.channel().isActive() && ctx.channel().isWritable()) {
ctx.writeAndFlush(RpcResponse.success(result, msg.getRequestId()));
} else {
logger.error("通道不可写");
}
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("处理过程调用时有错误发生:");
cause.printStackTrace();
ctx.close();
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
logger.info("长时间未收到心跳包,断开连接...");
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}

}

SimpleChannelInboundHandler 是 Netty 中的一个基础类,实现了 ChannelInboundHandler 接口。它主要用于处理入站事件,即从对等端接收到的数据或状态更改事件,例如对等端连接或断开连接。与 ChannelInboundHandlerAdapter 不同的是,SimpleChannelInboundHandler 可以自动释放资源,因此不需要显示地调用 ReferenceCountUtil.release() 释放资源。
当数据从远程节点传入时,SimpleChannelInboundHandler 将自动将其转换为指定类型的对象,并在调用 channelRead0() 方法时向你提供该对象,你只需要处理传入的数据。可以使用这个类来构建各种应用程序,例如聊天应用程序、游戏服务器、文件传输应用程序等。
而这里的requestHandler是一个单例,表示全程都使用这个单例去处理请求,这样就不会浪费大量的资源去重复创建实例。

RequestHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RequestHandler {

private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);
private static final ServiceProvider serviceProvider;

static {
serviceProvider = new ServiceProviderImpl();
}

public Object handle(RpcRequest rpcRequest) {
Object service = serviceProvider.getServiceProvider(rpcRequest.getInterfaceName());
return invokeTargetMethod(rpcRequest, service);
}

private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) {
Object result;
try {
Method method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
result = method.invoke(service, rpcRequest.getParameters());
logger.info("服务:{} 成功调用方法:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
return RpcResponse.fail(ResponseCode.METHOD_NOT_FOUND, rpcRequest.getRequestId());
}
return result;
}

}

这个RequestHandler使用的方法不多,最为主要的方法还是:
:::info
result = method.invoke(service, rpcRequest.getParameters());
:::
这个方法会调用Method实例的invoke方法,去执行相应的请求。
在Java中,Method类是反射机制的一部分,它代表一个类中的一个方法。可以使用Method类来获取关于方法的信息,如方法名、参数列表、返回类型、修饰符等,并且可以使用Method类来调用该方法。Method类提供了许多用于获取和调用方法的方法,如invoke()、getName()、getParameterTypes()、getReturnType()等。反射机制中的Method类可以使得在运行时动态地获取和调用类中的方法。

编解码器

编解码器可以说是整个RPC框架中最为重要的一部分,那么RPC为什么需要编解码器呢?
因为在RPC通信过程中,数据需要在网络中传输。在不同的计算机之间通信需要将对象序列化为字节流,传输完成后再反序列化为对象。编解码器的作用就是将对象序列化和反序列化的过程封装起来,让开发者可以更方便地进行通信。在Netty中,SimpleChannelInboundHandler类可以自动完成消息的解码和编码,大大简化了编解码器的编写过程。
那为什么不用直接用序列化?
虽然序列化可以将对象转换成字节流进行网络传输,但是它并不能满足RPC的需求。
RPC需要一个通用的方式来序列化和反序列化各种类型的消息,包括基本数据类型、复合数据类型和自定义类型等。而不同的序列化实现可能只支持特定的类型或数据格式,因此无法满足这个需求。
此外,RPC需要支持不同的编解码器,以便兼容不同的协议和框架。使用编解码器可以使得不同的实现之间相互兼容,也方便进行协议升级和兼容性处理。
因此,编解码器是RPC中必不可少的组件,它能够实现通用的序列化和反序列化,同时兼容不同的协议和框架,满足RPC的需求。

编码器

MessageToByteEncoder 是 Netty 提供的编码器抽象类,用于将消息转换为字节流进行网络传输。在 Netty 应用中,可以使用它将自定义的消息对象编码为二进制数据,以便通过网络进行传输。
实现 MessageToByteEncoder 需要重写 encode() 方法,该方法会在消息被写入通道前被自动调用。在 encode() 方法中,我们需要将消息对象转换为字节流,并将字节流写入到 ByteBuf 中。写入到 ByteBuf 中的字节流会在后续的 ChannelHandler 中被传递,最终通过网络传输到远程节点。
MessageToByteEncoder 中还提供了一些辅助方法,如 writeXXX() 系列方法可以将不同类型的数据写入到 ByteBuf 中,以及提供了一些钩子方法,可以在编码过程中对消息进行处理,比如对消息进行压缩、加密等操作。
总之,MessageToByteEncoder 是 Netty 提供的编码器抽象类,通过继承它可以实现自定义的消息编码器。
CommonEncoder继承了MessageToByteEncoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CommonEncoder extends MessageToByteEncoder {

private static final int MAGIC_NUMBER = 0xCAFEBABE;

private final CommonSerializer serializer;

public CommonEncoder(CommonSerializer serializer) {
this.serializer = serializer;
}

@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
out.writeInt(MAGIC_NUMBER);
if (msg instanceof RpcRequest) {
out.writeInt(PackageType.REQUEST_PACK.getCode());
} else {
out.writeInt(PackageType.RESPONSE_PACK.getCode());
}
out.writeInt(serializer.getCode());
byte[] bytes = serializer.serialize(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}

}

这是一个 Netty 中的编码器类,用于将消息对象转换为二进制字节流,以便在网络上进行传输。在 RPC 中,消息通常是由客户端和服务端之间相互传递的。因为在不同的机器之间传递数据时,需要将数据序列化为二进制流,才能在网络中传输。但是,不同的序列化方式可能具有不同的数据格式和协议,所以需要使用编解码器来统一数据格式和协议。这个类实现了 Netty 中的 MessageToByteEncoder 类,并将消息对象编码为二进制流,遵循了一个特定的协议,包括一个魔数、消息类型、序列化方式、消息长度和消息内容。其中,魔数用于识别协议版本,消息类型用于标识消息是请求还是响应,序列化方式用于指定消息内容的序列化方式,消息长度用于指定消息内容的长度,消息内容就是序列化后的消息体。这个类是 RPC 通信中必不可少的一部分。

解码器

ReplayingDecoder是Netty提供的一种特殊类型的解码器。与普通解码器不同,ReplayingDecoder可以在缓冲区数据不足时进行暂停,并在数据可用时恢复处理,而不是等待缓冲区填满。
具体来说,ReplayingDecoder类通过继承ByteToMessageDecoder类并使用状态机模式实现。状态机模式通过在每个状态中重写decode()方法来定义不同的处理行为。当状态更改时,它将转移到下一个状态,直到解码完成为止。
ReplayingDecoder的主要作用是简化解码器的实现,尤其是对于一些不确定数据长度的解码器。通过使用ReplayingDecoder,可以避免手动跟踪缓冲区中的字节数,从而减少出错的可能性。同时,ReplayingDecoder还可以提供更好的性能,因为它只需要处理缓冲区中实际可用的数据,而不是缓冲区中的所有数据
CommonDecoder继承了ReplayingDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class CommonDecoder extends ReplayingDecoder {

private static final Logger logger = LoggerFactory.getLogger(CommonDecoder.class);
private static final int MAGIC_NUMBER = 0xCAFEBABE;

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magic = in.readInt();
if (magic != MAGIC_NUMBER) {
logger.error("不识别的协议包: {}", magic);
throw new RpcException(RpcError.UNKNOWN_PROTOCOL);
}
int packageCode = in.readInt();
Class<?> packageClass;
if (packageCode == PackageType.REQUEST_PACK.getCode()) {
packageClass = RpcRequest.class;
} else if (packageCode == PackageType.RESPONSE_PACK.getCode()) {
packageClass = RpcResponse.class;
} else {
logger.error("不识别的数据包: {}", packageCode);
throw new RpcException(RpcError.UNKNOWN_PACKAGE_TYPE);
}
int serializerCode = in.readInt();
CommonSerializer serializer = CommonSerializer.getByCode(serializerCode);
if (serializer == null) {
logger.error("不识别的反序列化器: {}", serializerCode);
throw new RpcException(RpcError.UNKNOWN_SERIALIZER);
}
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes);
Object obj = serializer.deserialize(bytes, packageClass);
out.add(obj);
}

}

这个类是一个Netty解码器,用于将字节流转换为对象。具体来说,它的作用是将从网络中接收到的字节流解码为指定的Java对象,以供后续处理。
在方法decode()中,它首先读取一个整数值,如果它不等于预定义的一个常量值,就会抛出一个RpcException异常,这表示该字节流不是正确的RPC协议包。接下来,它读取协议包类型和序列化器类型,并检查它们是否为预期值,否则将抛出异常。然后,它读取字节流的长度,并将剩余的字节读入到字节数组中。最后,它使用指定的序列化器将字节流反序列化为预期的Java对象,并将其添加到输出列表中

Protobuf

Protocol Buffers(简称protobuf)是一种轻便高效的数据序列化格式,由Google开发。它与XML和JSON等格式相比,具有更小的数据体积和更快的解析速度,同时也可以生成各种编程语言的代码,从而方便了跨语言的数据交换和通信。在Java中,我们可以通过使用Protobuf库来实现Protobuf的序列化和反序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ProtobufSerializer implements CommonSerializer {

private LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
private Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();

@Override
@SuppressWarnings("unchecked")
public byte[] serialize(Object obj) {
Class clazz = obj.getClass();
Schema schema = getSchema(clazz);
byte[] data;
try {
data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} finally {
buffer.clear();
}
return data;
}

@Override
@SuppressWarnings("unchecked")
public Object deserialize(byte[] bytes, Class<?> clazz) {
Schema schema = getSchema(clazz);
Object obj = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}

@Override
public int getCode() {
return SerializerCode.valueOf("PROTOBUF").getCode();
}

@SuppressWarnings("unchecked")
private Schema getSchema(Class clazz) {
Schema schema = schemaCache.get(clazz);
if (Objects.isNull(schema)) {
// 这个schema通过RuntimeSchema进行懒创建并缓存
// 所以可以一直调用RuntimeSchema.getSchema(),这个方法是线程安全的
schema = RuntimeSchema.getSchema(clazz);
if (Objects.nonNull(schema)) {
schemaCache.put(clazz, schema);
}
}
return schema;
}

}

这是一个实现了CommonSerializer接口的类,用于将Java对象序列化为Protobuf格式的字节数组,或者将字节数组反序列化为Java对象。
具体来说,该类中的serialize方法将一个Java对象序列化为Protobuf格式的字节数组,实现过程如下:

  1. 获取对象的类类型Class clazz。
  2. 通过getSchema方法获取该类对应的Schema对象。
  3. 使用该Schema对象和LinkedBuffer对象调用ProtostuffIOUtil.toByteArray方法将Java对象序列化为字节数组。
  4. 最后清空LinkedBuffer对象并返回序列化后的字节数组。

而deserialize方法则是将字节数组反序列化为Java对象:

  1. 获取对象的类类型Class clazz。
  2. 通过getSchema方法获取该类对应的Schema对象。
  3. 调用schema.newMessage()创建一个该类的空对象。
  4. 使用字节数组和Schema对象调用ProtostuffIOUtil.mergeFrom方法将字节数组反序列化为Java对象并返回。

同时,为了提高性能,该类中使用了缓存机制,通过ConcurrentHashMap缓存Schema对象,以便在下次序列化或反序列化时能够更快地获取Schema对象,避免了重复创建的开销。

在具体的分析下面类的作用:

LinkedBuffer

LinkedBuffer是Protostuff序列化库中的一个类,用于在序列化过程中存储数据。它是一个基于链表的动态缓存区,它会自动根据当前写入数据的大小来调整缓存区的大小。
具体来说,LinkedBuffer维护了一个字节数组(即缓存区),一个指向缓存区首部的指针和一个指向缓存区尾部的指针。当我们往缓存区写入数据时,LinkedBuffer会先检查当前剩余的空间是否足够,如果不够则会自动扩展缓存区。扩展时会新建一个更大的缓存区,并将当前缓存区中的数据复制到新缓存区中,然后将新缓存区设置为当前缓存区。
LinkedBuffer使用链表来管理多个缓存区,每次扩展时都会新建一个缓存区并添加到链表尾部。这样做的好处是可以避免频繁的内存分配和拷贝,从而提高序列化性能。
在上面的代码中,LinkedBuffer被用于在ProtobufSerializer类中序列化对象时存储数据。当我们调用ProtostuffIOUtil.toByteArray()方法将一个对象序列化为字节数组时,需要传入一个LinkedBuffer对象作为参数,这个对象会在序列化过程中被自动扩展。当序列化完成后,我们需要手动调用LinkedBuffer.clear()方法清空缓存区,以便下次使用。

Schema

在 Protobuf 序列化中,Schema 是一个用于描述消息结构的类,类似于 Java 对象中的 Class。Schema 类的实例提供了一些方法来获取消息的字段和类型信息,从而可以将消息序列化和反序列化为二进制数据。
在使用 Protobuf 进行序列化时,我们需要为每个消息类创建一个 Schema 对象,并将其缓存起来以供重复使用。这样可以提高序列化和反序列化的效率,避免重复创建和解析 Schema 对象。
在上面提供的 ProtobufSerializer 类中,getSchema 方法用于获取指定类型的 Schema 对象。如果缓存中已经存在该类型的 Schema 对象,则直接返回;否则,使用 RuntimeSchema.getSchema 方法创建一个新的 Schema 对象,并将其存入缓存中。这样,在序列化和反序列化时,就可以直接使用缓存中的 Schema 对象,提高了性能。

ProtostuffIOUtil

ProtostuffIOUtil是Protostuff序列化框架中的一个工具类,主要提供了将Java对象序列化成byte数组和将byte数组反序列化成Java对象的功能。
它提供了以下主要的静态方法:

  • toByteArray(T message, Schema schema, LinkedBuffer buffer):将一个Java对象序列化成byte数组。
  • fromByteArray(byte[] data, T message, Schema schema):将一个byte数组反序列化成Java对象。
  • mergeFrom(byte[] data, T message, Schema schema):将一个byte数组中的数据合并到一个Java对象中。

其中,Schema是Protostuff序列化框架中的一个关键接口,用于描述Java对象的序列化格式。而LinkedBuffer则是一个可扩容的缓冲区,用于存储序列化后的数据。在使用Protostuff进行序列化时,可以通过LinkedBuffer.allocate()方法创建一个缓冲区,用于存储序列化后的数据。

RuntimeSchema

RuntimeSchema是Protostuff库的一个类,它提供了将Java类转换为Protobuf格式的Schema的功能。在使用Protobuf进行序列化和反序列化时,需要提供一个Schema来指定序列化的字段、类型等信息。RuntimeSchema的作用就是根据Java类的结构生成一个对应的Schema。
通常情况下,使用Protobuf进行序列化和反序列化时,需要手动定义一个Proto文件来描述消息的结构。而使用RuntimeSchema,可以将Java类当作Proto文件来使用,它会自动生成一个对应的Schema。
需要注意的是,由于RuntimeSchema是在运行时生成的,因此会对性能产生一定影响。在高性能场景中,建议使用预编译的Proto文件来进行序列化和反序列化。

客户端分析

要实现客户端的逻辑,最基本的就是要实现动态代理:
InvocationHandler 是 Java 标准库中的一个接口,它用于实现动态代理。
动态代理是一种运行时生成代理对象的技术。使用动态代理可以在运行时动态地创建一个实现特定接口的代理类,这个代理类可以将所有方法调用委托给指定的对象或方法。在委托调用前或调用后,代理类可以执行额外的逻辑,例如统计方法调用次数、记录方法调用日志等。
InvocationHandler 接口定义了一个方法 invoke,该方法会在代理类每次调用方法时被调用。该方法有三个参数:

  1. proxy:代理对象
  2. method:被调用的方法
  3. args:被调用方法的参数列表

invoke 方法的返回值是 Object 类型,它表示被调用方法的返回值。因此,当我们想要使用动态代理技术时,需要实现 InvocationHandler 接口并重写 invoke 方法,来控制代理类如何处理方法调用

RpcClientProxy

RpcClientProxy 将实现 InvocationHandler 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class RpcClientProxy implements InvocationHandler {

private static final Logger logger = LoggerFactory.getLogger(RpcClientProxy.class);

private final RpcClient client;

public RpcClientProxy(RpcClient client) {
this.client = client;
}

@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}

@SuppressWarnings("unchecked")
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
logger.info("调用方法: {}#{}", method.getDeclaringClass().getName(), method.getName());
RpcRequest rpcRequest = new RpcRequest(UUID.randomUUID().toString(), method.getDeclaringClass().getName(),
method.getName(), args, method.getParameterTypes(), false);
RpcResponse rpcResponse = null;
if (client instanceof NettyClient) {
try {
CompletableFuture<RpcResponse> completableFuture = (CompletableFuture<RpcResponse>) client.sendRequest(rpcRequest);
rpcResponse = completableFuture.get();
} catch (Exception e) {
logger.error("方法调用请求发送失败", e);
return null;
}
}
if (client instanceof SocketClient) {
rpcResponse = (RpcResponse) client.sendRequest(rpcRequest);
}
RpcMessageChecker.check(rpcRequest, rpcResponse);
return rpcResponse.getData();
}
}

这段代码定义了一个远程调用的客户端代理类RpcClientProxy,实现了InvocationHandler接口,用于生成一个远程服务接口的代理对象。其中,RpcClient是一个抽象类,NettyClient和SocketClient分别是其子类,用于不同的网络传输方式进行远程调用。
该类中的getProxy方法,使用了Java动态代理技术,生成了一个实现了远程服务接口的代理对象,该代理对象的所有方法调用都会被拦截并转化为远程调用,从而实现了RPC远程调用的透明化。
在invoke方法中,通过封装一个RpcRequest对象来表示对远程服务的调用,并通过客户端发送请求获取到返回结果RpcResponse,最后将返回结果中的数据返回给调用方。在发送远程调用请求之前,还进行了一些简单的参数检查,确保请求的正确性和完整性。

NettyClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class NettyClient implements RpcClient {

private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private static final EventLoopGroup group;
private static final Bootstrap bootstrap;

static {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class);
}

private final ServiceDiscovery serviceDiscovery;
private final CommonSerializer serializer;

private final UnprocessedRequests unprocessedRequests;

public NettyClient() {
this(DEFAULT_SERIALIZER, new RandomLoadBalancer());
}
public NettyClient(LoadBalancer loadBalancer) {
this(DEFAULT_SERIALIZER, loadBalancer);
}
public NettyClient(Integer serializer) {
//随机轮询策略来进行负载均衡
this(serializer, new RandomLoadBalancer());
}
public NettyClient(Integer serializer, LoadBalancer loadBalancer) {
//另一个构造方法,表示自定义负载均衡策略

//注入服务发现
this.serviceDiscovery = new NacosServiceDiscovery(loadBalancer);
//序列化器
this.serializer = CommonSerializer.getByCode(serializer);
//保留意见
this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
}

@Override
public CompletableFuture<RpcResponse> sendRequest(RpcRequest rpcRequest) {
if (serializer == null) {
logger.error("未设置序列化器");
throw new RpcException(RpcError.SERIALIZER_NOT_FOUND);
}
CompletableFuture<RpcResponse> resultFuture = new CompletableFuture<>();
try {
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest.getInterfaceName());
Channel channel = ChannelProvider.get(inetSocketAddress, serializer);
if (!channel.isActive()) {
group.shutdownGracefully();
return null;
}
unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
logger.info(String.format("客户端发送消息: %s", rpcRequest.toString()));
} else {
future1.channel().close();
resultFuture.completeExceptionally(future1.cause());
logger.error("发送消息时有错误发生: ", future1.cause());
}
});
} catch (InterruptedException e) {
unprocessedRequests.remove(rpcRequest.getRequestId());
logger.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
return resultFuture;
}

}

这段代码定义了一个基于Netty的RPC客户端,实现了RpcClient接口,并且封装了发送请求的具体细节。
在类的静态代码块中,创建了一个EventLoopGroup和一个Bootstrap对象,用于配置Netty客户端。EventLoopGroup是处理事件循环的抽象类,Bootstrap则是用于客户端引导的类。在其中使用了NioEventLoopGroup和NioSocketChannel类来实现NIO客户端,这里用到了Netty的API。
接着在类的构造方法中,初始化了一个服务发现对象和一个序列化器对象。服务发现对象是用于从服务注册中心获取服务地址的。序列化器对象是用于将请求和响应对象序列化和反序列化的,该类的序列化器可以通过传入参数来选择使用哪种类型的序列化器。
在sendRequest方法中,首先判断序列化器是否为空,如果为空则抛出异常。然后通过服务发现对象获取到远程服务的地址,根据地址获取一个Channel对象,ChannelProvider.get方法会返回一个新的Channel或者已有的Channel,如果没有就会创建一个新的Channel。在获取到Channel对象后,使用Netty的writeAndFlush方法将请求对象发送到服务端,使用addListener添加一个ChannelFutureListener监听器,可以在发送成功或失败时执行相应的操作,如打印日志或者回调。发送请求时还将该请求的请求ID和响应结果对应的CompletableFuture对象存储到一个全局的UnprocessedRequests对象中。
最后,sendRequest方法返回一个CompletableFuture对象,用于异步等待响应结果。在响应结果到达时,UnprocessedRequests对象会将响应结果的CompletableFuture对象取出并使用complete方法设置结果。而调用sendRequest方法的线程会在CompletableFuture对象的get方法上阻塞,直到CompletableFuture对象的complete方法被调用为止,然后会返回响应结果。
接着我们一步步看这个客户端有哪些用到的类:

负载均衡

负载均衡会共用一个接口:

1
2
3
4
5
public interface LoadBalancer {

Instance select(List<Instance> instances);

}

然后有着其对应的实现。
随机策略:

1
2
3
4
5
6
7
8
public class RandomLoadBalancer implements LoadBalancer {

@Override
public Instance select(List<Instance> instances) {
return instances.get(new Random().nextInt(instances.size()));
}

}

轮询策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class RoundRobinLoadBalancer implements LoadBalancer {

private int index = 0;

@Override
public Instance select(List<Instance> instances) {
if(index >= instances.size()) {
index %= instances.size();
}
return instances.get(index++);
}

}

但是可以看到,无论是哪一种策略,都是要先获取到Instance实例,然后使用相应的负载均衡策略,那么这个实例,则是Nacos包提供的一个类,那么我们也自然需要一个Nacos方法来提供。

NacosServiceDiscovery

服务发现类,同时也对上述的负载均衡做出了解释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class NacosServiceDiscovery implements ServiceDiscovery {

private static final Logger logger = LoggerFactory.getLogger(NacosServiceDiscovery.class);

private final LoadBalancer loadBalancer;

public NacosServiceDiscovery(LoadBalancer loadBalancer) {
if(loadBalancer == null) this.loadBalancer = new RandomLoadBalancer();
else this.loadBalancer = loadBalancer;
}

@Override
public InetSocketAddress lookupService(String serviceName) {
try {
List<Instance> instances = NacosUtil.getAllInstance(serviceName);
if(instances.size() == 0) {
logger.error("找不到对应的服务: " + serviceName);
throw new RpcException(RpcError.SERVICE_NOT_FOUND);
}
Instance instance = loadBalancer.select(instances);
return new InetSocketAddress(instance.getIp(), instance.getPort());
} catch (NacosException e) {
logger.error("获取服务时有错误发生:", e);
}
return null;
}

}

这段代码定义了一个 NacosServiceDiscovery 类,实现了 ServiceDiscovery 接口,主要用于在基于 Nacos 注册中心的服务发现中获取服务地址。具体作用如下:

  1. NacosServiceDiscovery 类有一个构造方法,用于初始化负载均衡策略,如果未指定,则默认使用随机负载均衡策略。
  2. lookupService 方法用于获取指定服务名对应的 InetSocketAddress,通过调用 NacosUtil 工具类获取所有的服务实例,然后使用负载均衡策略选择一个实例,最终返回该实例的地址信息。
  3. 如果找不到对应的服务,则会抛出 RpcException 异常,并记录错误日志。

总之,该类用于实现基于 Nacos 注册中心的服务发现功能,可以根据服务名从注册中心获取服务实例并进行负载均衡选择,返回可用的服务地址。

UnprocessedRequests

这个类名为 UnprocessedRequests,用于处理未处理的RPC请求。同时也是更好的控制数据的获取,因为如果使用Netty自带的阻塞获取方法,太过于复杂了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class UnprocessedRequests {

private static ConcurrentHashMap<String, CompletableFuture<RpcResponse>> unprocessedResponseFutures = new ConcurrentHashMap<>();

public void put(String requestId, CompletableFuture<RpcResponse> future) {
unprocessedResponseFutures.put(requestId, future);
}

public void remove(String requestId) {
unprocessedResponseFutures.remove(requestId);
}

public void complete(RpcResponse rpcResponse) {
CompletableFuture<RpcResponse> future = unprocessedResponseFutures.remove(rpcResponse.getRequestId());
if (null != future) {
future.complete(rpcResponse);
} else {
throw new IllegalStateException();
}
}
}

在这个类中,使用了一个静态的 ConcurrentHashMap 对象,用于存储未处理的RPC请求。
其中,Key 值为请求的ID,Value 值为一个 CompletableFuture 对象,用于异步获取 RPC 响应结果。
这可以用来记录客户端发送的请求并等待响应。主要包含以下方法:

  1. put(String requestId, CompletableFuture future):将请求 ID 和对应的 CompletableFuture 存储到 ConcurrentHashMap 中。
  2. remove(String requestId):从 ConcurrentHashMap 中删除指定的请求 ID。
  3. complete(RpcResponse rpcResponse):根据响应中的请求 ID 找到对应的 CompletableFuture 并将响应数据传递给它。如果没有找到对应的 CompletableFuture,则抛出 IllegalStateException 异常。

这个类的作用是确保客户端发送的每个请求都有一个对应的 CompletableFuture 实例,用来等待服务器响应。在客户端收到服务器响应后,可以使用 UnprocessedRequests.complete() 方法将响应数据传递给对应的 CompletableFuture。这种机制使得客户端可以异步发送请求并等待响应,而不需要阻塞线程。

CompletableFuture

CompletableFuture类是Java8引入的一个异步编程工具,用于处理异步操作的结果。它提供了一些方法来处理异步任务的结果,例如将结果传递给下一个任务,等待任务完成,组合多个任务等。
在异步编程中,通常会使用回调函数来处理异步任务的结果,但这种方式会使代码变得冗长且难以维护。CompletableFuture类的出现,使得异步编程变得更加简单和可读。
CompletableFuture类有以下几个主要特点:

  1. 可以将一个异步操作的结果传递给下一个操作,这种操作被称为”组合”。
  2. 可以等待一个异步操作的结果,并在操作完成后执行一些操作,例如打印日志、释放资源等。
  3. 可以在多个异步操作完成后执行一些操作,例如将它们的结果组合起来,计算它们的平均值等。
  4. 可以通过异常处理机制来处理异步操作中的异常。

使用CompletableFuture类,可以更加方便地处理异步任务,提高代码的可读性和可维护性。同时,它也是Java并发编程中非常有用的工具之一。

ChannelProvider

这段代码实现了一个用于获取客户端 Channel 的工具类 ChannelProvider。它维护了一个 Map 类型的 channels 成员变量,用于缓存已经连接的 Channel 对象,通过 get 方法获取指定地址的 Channel 对象。

  1. 在 get 方法中,首先根据地址和序列化器生成 key,从 channels 缓存中查找是否已有对应的 Channel 对象。如果有,就返回已有的 Channel 对象;如果没有,就通过 bootstrap 进行连接。在连接成功后,将新建的 Channel 对象存入 channels 缓存,并返回该对象。如果连接失败,返回 null。
  2. 在 connect 方法中,通过 CompletableFuture 异步获取连接结果。在连接成功后,将 Channel 对象作为 CompletableFuture 的返回值。
  3. initializeBootstrap 方法则初始化 Bootstrap 对象,并设置一些常用的参数,如连接超时时间、是否启用 TCP 底层心跳机制等。

ChannelProvider 为客户端连接提供了一个通用的方法,简化了客户端连接的过程,提高了代码的复用性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class ChannelProvider {

private static final Logger logger = LoggerFactory.getLogger(ChannelProvider.class);
private static EventLoopGroup eventLoopGroup;
private static Bootstrap bootstrap = initializeBootstrap();

private static Map<String, Channel> channels = new ConcurrentHashMap<>();

public static Channel get(InetSocketAddress inetSocketAddress, CommonSerializer serializer) throws InterruptedException {
String key = inetSocketAddress.toString() + serializer.getCode();
if (channels.containsKey(key)) {
Channel channel = channels.get(key);
if(channels != null && channel.isActive()) {
return channel;
} else {
channels.remove(key);
}
}
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
/*自定义序列化编解码器*/
// RpcResponse -> ByteBuf
ch.pipeline().addLast(new CommonEncoder(serializer))
.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS))
.addLast(new CommonDecoder())
.addLast(new NettyClientHandler());
}
});
Channel channel = null;
try {
channel = connect(bootstrap, inetSocketAddress);
} catch (ExecutionException e) {
logger.error("连接客户端时有错误发生", e);
return null;
}
channels.put(key, channel);
return channel;
}

private static Channel connect(Bootstrap bootstrap, InetSocketAddress inetSocketAddress) throws ExecutionException, InterruptedException {
CompletableFuture<Channel> completableFuture = new CompletableFuture<>();
bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
logger.info("客户端连接成功!");
completableFuture.complete(future.channel());
} else {
throw new IllegalStateException();
}
});
return completableFuture.get();
}

private static Bootstrap initializeBootstrap() {
eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
//连接的超时时间,超过这个时间还是建立不上的话则代表连接失败
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
//是否开启 TCP 底层心跳机制
.option(ChannelOption.SO_KEEPALIVE, true)
//TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
.option(ChannelOption.TCP_NODELAY, true);
return bootstrap;
}

}

更详细一点的说:
这段代码实现了一个用于获取Netty客户端Channel的工具类ChannelProvider。它提供了一个get方法,该方法接收一个InetSocketAddress类型的参数和一个序列化器CommonSerializer,用于获取与指定服务提供者(IP地址和端口号)之间的连接。该方法首先将InetSocketAddress和序列化器的编码方式作为key,从Map中查找已有的Channel。如果找到的Channel是可用的,那么直接返回这个Channel。如果找到的Channel不可用,那么将它从Map中删除。接下来创建一个新的Channel,该Channel使用了上述编码方式进行了初始化,并且连接到了指定的服务提供者,最后将新创建的Channel保存到Map中。
在ChannelProvider类中,包含了一个静态的EventLoopGroup类型的变量eventLoopGroup和一个静态的Bootstrap类型的变量bootstrap。在类初始化时,这些变量被初始化为一个NioEventLoopGroup和一个Bootstrap实例。Bootstrap实例会初始化连接到远程服务提供者的客户端的参数,包括TCP连接、超时时间、TCP底层心跳机制等。这就是Channel的模板,创建一次就好了,不用每次都创建一次,这样可以很好的减少代码冗余和提高复用性。

  • get()方法是ChannelProvider的主要方法。它接收一个InetSocketAddress类型的参数和一个序列化器CommonSerializer,用于获取连接到指定服务提供者的Channel。首先,根据传入的参数,生成一个唯一的key,用于从Map中查找是否已经存在一个可用的Channel。如果找到的Channel是可用的,直接返回这个Channel对象。否则,创建一个新的Channel,并且将它保存到Map中。最后,返回新创建的Channel对象。
  • initChannel()方法用于初始化客户端Channel的pipeline。在这里,我们首先添加了一个自定义的序列化编解码器,然后添加了一个心跳检测处理器IdleStateHandler、一个通用解码器CommonDecoder和一个客户端处理器NettyClientHandler。这些处理器将按照顺序添加到客户端Channel的pipeline中。
  • connect()方法用于创建连接到指定服务提供者的Channel,并返回连接成功后的Channel对象。为了处理异步连接的结果,它使用了一个CompletableFuture对象completableFuture,该对象用于接收连接结果。当连接成功时,completableFuture将被设置为连接成功的Channel对象;当连接失败时,completableFuture将抛出异常。

这个Channel包含的一个NettyClientHandler,有相应对收到请求的处理:

NettyClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {

private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

private final UnprocessedRequests unprocessedRequests;

public NettyClientHandler() {
this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
try {
logger.info(String.format("客户端接收到消息: %s", msg));
unprocessedRequests.complete(msg);
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("过程调用时有错误发生:");
cause.printStackTrace();
ctx.close();
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
logger.info("发送心跳包 [{}]", ctx.channel().remoteAddress());
Channel channel = ChannelProvider.get((InetSocketAddress) ctx.channel().remoteAddress(), CommonSerializer.getByCode(CommonSerializer.DEFAULT_SERIALIZER));
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setHeartBeat(true);
channel.writeAndFlush(rpcRequest).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}

这段代码是一个 Netty 客户端处理器,用于处理服务器发送的响应消息。它继承了 Netty 的 SimpleChannelInboundHandler 类,实现了其中的 channelRead0() 方法和 exceptionCaught() 方法。
在channelRead0()方法中,通过UnprocessedRequests对象的complete()方法处理返回的RpcResponse对象。在此方法中,使用logger打印接收到的消息,然后调用UnprocessedRequests.complete()方法,将对应的CompletableFuture对象标记为完成,并将RpcResponse对象作为结果
在 exceptionCaught() 方法中,发生异常时,会先记录日志,然后关闭客户端通道。
另外,该类还重写了 userEventTriggered() 方法,用于发送心跳包。如果客户端在一段时间内没有发送数据,则会自动触发该方法,并向服务器发送一个心跳包。当客户端发送心跳包时,会调用 ChannelProvider 类的 get() 方法获取一个 Channel 对象,并向该 Channel 对象写入一个标记了心跳标志的 RpcRequest 对象,最后调用 close() 方法关闭该 Channel。

完结