RPC又称远程过程调用(Remote Procedure Call),用于解决分布式系统中服务之间的调用问题。通俗地讲,就是开发者能够像调用本地方法一样调用远程的服务。

RPC框架一般必须包含三个组件,分别是客户端、服务端以及注册中心,一次完整的RPC调用流程一般为:

  1. 服务端启动服务后,将他提供的服务列表发布到注册中心(服务注册)
  2. 客户端会向注册中心订阅相关的服务地址(服务订阅)
  3. 客户端通常会利用本地代理模块 Proxy 向服务端发起远程过程调用,Proxy 负责将调用的方法、参数等数据转化为网络字节流
  4. 客户端从服务列表中根据负载均衡策略选择一个服务地址,并将数据通过网络发送给服务端
  5. 服务端得到数据后,调用对应的服务,然后将结果通过网络返回给客户端

虽然 RPC 调用流程很容易理解,但是实现一个完整的 RPC 框架设计到很多内容,例如服务注册与发现、通信协议与序列化、负载均衡、动态代理等

项目目标

  • 实现基于Netty/Socket/Http三种方式进行网路通信
  • 自定义消息协议,编解码器
  • 五种序列化算法(JDK、JSON、HESSIAN、KRYO、PROTOSTUFF)
  • 三种负载均衡算法(RoundRobin、Random、ConsistentHash)
  • 两种动态代理(JDK、CGLIB)
  • 基于 Zookeeper 的服务注册与发现,增加服务本地缓存与监听
  • 集成 Spring,自定义注解提供 RPC 组件扫描、服务注册、服务消费
  • 集成 SpringBoot,完成自动配置
  • 增加 Netty 心跳机制,复用 Channel 连接
  • 实现自定义 SPI 机制

编解码模块

粘包半包解决

这里采取的是消息长度 + 消息内容来解决的此问题,将每一条消息分为header和body,header中包含body的长度

/**
* 粘包半包编码器,使用固定长度的帧解码器,通过约定用定长字节表示接下来数据的长度
* 非共享,保存了 ByteBuf 的状态信息
*/
public class RpcFrameDecoder extends LengthFieldBasedFrameDecoder {

k RpcFrameDecoder#RpcFrameDecoder(int, int, int)}

public RpcFrameDecoder() {
this(16384, 12, 4); // 前缀长度16字节
}

/**
* 构造方法
*
* @param maxFrameLength 数据帧的最大长度
* @param lengthFieldOffset 长度域的偏移字节数
* @param lengthFieldLength 长度域所占的字节数
*/
public RpcFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}

}

注意这里我给定的消息体最大长度是16384Byte,即16KB,如果消息体长度比这个要大,就会抛出异常,无法接收

Netty官方建议不要轻易超过 8MB,否则可能导致内存过度分配、内存攻击等问题

编解码实现

编码部分是出站处理,需要编写RpcMessage模块,即header和body,header是自定义协议:

---------------------------------------------------------------------
| 魔数 (4byte) | 版本号 (1byte) | 序列化算法 (1byte) | 消息类型 (1byte) |
-------------------------------------------------------------------
| 状态类型 (1byte) | 消息序列号 (4byte) | 消息长度 (4byte) |
---------------------------------------------------------------------
| 消息内容 (不固定) |
---------------------------------------------------------------------

共16字节,在粘包半包解码器中定义了偏移量。

body部分使用序列化算法进行序列化,然后向下传递。

解码部分,先验证魔数是否符合要求,然后再进行后续的解码,在使用反序列化算法解析body部分获取消息体,然后向下传递。

@Sharable	// 不存在线程竞争
public class SharableRpcMessageCodec extends MessageToMessageCodec<ByteBuf, RpcMessage> {

// 编码器为出站处理,将 RpcMessage 编码为 ByteBuf 对象
@Override
protected void encode(ChannelHandlerContext ctx, RpcMessage msg, List<Object> out) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
MessageHeader header = msg.getHeader();
// 4字节 魔数
buf.writeBytes(header.getMagicNum());
// 1字节 版本号
buf.writeByte(header.getVersion());
// 1字节 序列化算法
buf.writeByte(header.getSerializerType());
// 1字节 消息类型
buf.writeByte(header.getMessageType());
// 1字节 消息状态
buf.writeByte(header.getMessageStatus());
// 4字节 消息序列号
buf.writeInt(header.getSequenceId());

// 取出消息体
Object body = msg.getBody();
// 获取序列化算法
Serialization serialization = SerializationFactory
.getSerialization(SerializationType.parseByType(header.getSerializerType()));
// 进行序列化
byte[] bytes = serialization.serialize(body);
// 设置消息体长度
header.setLength(bytes.length);

// 4字节 消息内容长度
buf.writeInt(header.getLength());

// 不固定字节 消息内容字节数组
buf.writeBytes(bytes);

// 传递到下一个出站处理器
out.add(buf);
}

// 解码器为入站处理,将 ByteBuf 对象解码成 RpcMessage 对象
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
// 4字节 魔数
int len = ProtocolConstants.MAGIC_NUM.length;
byte[] magicNum = new byte[len];
msg.readBytes(magicNum, 0, len);
// 判断魔数是否正确,不正确表示非协议请求,不进行处理
for (int i = 0; i < len; i++) {
if (magicNum[i] != ProtocolConstants.MAGIC_NUM[i]) {
throw new IllegalArgumentException("Unknown magic code: " + Arrays.toString(magicNum));
}
}

// 1字节 版本号
byte version = msg.readByte();
// 检查版本号是否一致
if (version != ProtocolConstants.VERSION) {
throw new IllegalArgumentException("The version isn't compatible " + version);
}

// 1字节 序列化算法
byte serializeType = msg.readByte();
// 1字节 消息类型
byte messageType = msg.readByte();
// 1字节 消息状态
byte messageStatus = msg.readByte();
// 4字节 消息序列号
int sequenceId = msg.readInt();
// 4字节 长度
int length = msg.readInt();

byte[] bytes = new byte[length];
msg.readBytes(bytes, 0, length);

// 构建协议头部信息
MessageHeader header = MessageHeader.builder()
.magicNum(magicNum)
.version(version)
.serializerType(serializeType)
.messageType(messageType)
.sequenceId(sequenceId)
.messageStatus(messageStatus)
.length(length).build();

// 获取反序列化算法
Serialization serialization = SerializationFactory
.getSerialization(SerializationType.parseByType(serializeType));
// 获取消息枚举类型
MessageType type = MessageType.parseByType(messageType);
RpcMessage protocol = new RpcMessage();
protocol.setHeader(header);
if (type == MessageType.REQUEST) {
// 进行反序列化
RpcRequest request = serialization.deserialize(RpcRequest.class, bytes);
protocol.setBody(request);
} else if (type == MessageType.RESPONSE) {
// 进行反序列化
RpcResponse response = serialization.deserialize(RpcResponse.class, bytes);
protocol.setBody(response);
} else if (type == MessageType.HEARTBEAT_REQUEST || type == MessageType.HEARTBEAT_RESPONSE) {
String message = serialization.deserialize(String.class, bytes);
protocol.setBody(message);
}
// 传递到下一个处理器
out.add(protocol);
}
}

序列化算法

客户端和服务端在通信过程中肯定要传输数据,但是这些数据不可能是直接明文传输的,我们需要对数据进行编码,那么该如何编解码呢?

如果采用TCP协议,我们需要将调用的接口、方法、请求参数、调用属性等信息序列化成二进制字节流传递给服务提供方,服务端接收到数据后,再把二进制字节流反序列化得到调用信息,然后利用反射的原理调用对应方法,最后将返回结果、返回码、异常信息等返回给客户端。所谓序列化和反序列化就是将对象转换成二进制流以及将二进制流再转换成对象的过程。因为网络通信依赖于字节流,而且这些请求信息都是不确定的,所以一般会选用通用且高效的序列化算法。比较常用的序列化算法有FastJson、Kryo、Hessian、Protobuf等,这些第三方序列化算法都比Java原生的序列化操作都更加高效

Dubbo支持多种序列化算法,并定义了Serialization接口规范,所有序列化算法扩展都必须实现该接口,其中默认使用的是Hessian序列化算法

序列化对于远程调用的响应速度、吞吐量、网络带宽消耗等同样也起着至关重要的作用,是提升分布式系统性能的最关键因素之一

判断一个编码框架的优劣主要从以下几个方面:

  1. 是否支持跨语言,支持语种是否丰富
  2. 编码后的码流
  3. 编解码的性能
  4. 类库是否小巧,API使用是否方便
  5. 使用者开发的工作量和难度

本项目计划实现5种序列化算法,分别为:JDK、JSON、HESSIAN、KRYO 、PROTOSTUFF,其中JSON使用GSON实现

五种序列化算法的比较如下:

序列化算法 优点 缺点
Kryo 速度快,序列化后体积小 跨语言支持较复杂
Hessian 默认支持跨语言 较慢
Protostuff 速度快,基于protobuf 需静态编译
Json 使用方便 性能一般
Jdk 使用方便,可序列化所有类 速度慢,占空间

Hessian序列化

public class HessianSerialization implements Serialization {
@Override
public <T> byte[] serialize(T object) {
try {
// 创建字节数组输出流(内存缓冲区)
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// 创建Hessian序列化输出流
HessianSerializerOutput hso = new HessianSerializerOutput(baos);
hso.writeObject(object);
hso.flush();
return baos.toByteArray();
} catch (IOException e) {
throw new SerializeException("Hessian serialize failed.", e);
}
}

@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
// 将字节数组包装为输入流
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
// 创建 Hessian 反序列化输入流
HessianSerializerInput hsi = new HessianSerializerInput(bis);
return (T) hsi.readObject();
} catch (IOException e) {
throw new SerializeException("Hessian deserialize failed.", e);
}
}
}

Jdk序列化

public class JdkSerialization implements Serialization {
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(object);
return baos.toByteArray();
} catch (IOException e) {
throw new SerializeException("Jdk serialize failed.", e);
}
}

@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bais);
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new SerializeException("Jdk deserialize failed.", e);
}
}
}

Json序列化

由于Gson默认不能序列化Class对象,因为Class没有默认的序列化方案,举个例子,我们的RPCRequest类是这样的:

@Data
public class RpcRequest implements Serializable {

/**
* 服务名称:请求的服务名 + 版本
*/
private String serviceName;

/**
* 请求调用的方法名称
*/
private String method;

/**
* 参数类型
*/
private Class<?>[] parameterTypes;

/**
* 参数
*/
private Object[] parameterValues;

}

假设我们要远程调用的方法如下:

public User getUser(String id, int level)

那我们就会创建这个RpcRequest

RpcRequest request = new RpcRequest();
request.setMethod("getUser");
request.setParameterTypes(new Class<?>[]{String.class, int.class});
request.setParameterValues(new Object[]{"abc123", 5});

如果我们要使用Gson序列化这个类时,就可以自己注册一个 JsonSerializer<Class<?>>JsonDeserializer<Class<?>>,完全实现:

public class JsonSerialization implements Serialization {
/**
* 自定义 JavaClass 对象序列化,解决 Gson 无法序列化 Class 信息
*/
static class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {
// 反序列化 String -> Class
@SneakyThrows
@Override
public Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
String name = json.getAsString();
return Class.forName(name);
}

// 序列化 Class -> String (类名)
@Override
public JsonElement serialize(Class<?> src, Type typeOfSrc, JsonSerializationContext context) {
return new JsonPrimitive(src.getName());
}
}

@Override
public <T> byte[] serialize(T object) {
try {
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();
String json = gson.toJson(object);
return json.getBytes(StandardCharsets.UTF_8);
} catch (Exception e) {
throw new SerializeException("Json serialize failed.", e);
}
}

@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();
String json = new String(bytes, StandardCharsets.UTF_8);
return gson.fromJson(json, clazz);
} catch (JsonSyntaxException e) {
throw new SerializeException("Json deserialize failed.", e);
}
}
}

Kryo序列化

为什么Kryo线程不安全?其他序列化方法为什么线程安全?这个问题我们之后单独开一个帖子说~

public class KryoSerialization implements Serialization {

// kryo 线程不安全,所以使用 ThreadLocal 保存 kryo 对象
private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
// 注册需要序列化的类(提高性能并减少数据体积)
kryo.register(RpcRequest.class);
kryo.register(RpcResponse.class);
return kryo;
});

@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Output output = new Output(baos);
Kryo kryo = kryoThreadLocal.get(); // 获取当前线程的 Kryo 实例
// 将对象序列化为 byte 数组
kryo.writeObject(output, object);
// 清理线程变量,避免内存泄漏
kryoThreadLocal.remove();
return output.toBytes();
} catch (Exception e) {
throw new SerializeException("Kryo serialize failed.", e);
}
}

@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
Input input = new Input(bais);
Kryo kryo = kryoThreadLocal.get();
// 将 byte 数组反序列化为 T 对象
T object = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return object;
} catch (Exception e) {
throw new SerializeException("Kryo deserialize failed.", e);
}
}
}

Protostuff序列化

Protostuff需要用一个LinkedBuffer作为临时写入区域,项目中提前分配一个共享的 BUFFER,避免每次都重新分配,提高性能

public class ProtostuffSerialization implements Serialization {

/**
* 提前分配好 Buffer,避免每次进行序列化都需要重新分配 buffer 内存空间
*/
private static final ThreadLocal<LinkedBuffer> BUFFER_THREAD_LOCAL =
ThreadLocal.withInitial(() -> LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));


@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public <T> byte[] serialize(T object) {
LinkedBuffer buffer = BUFFER_THREAD_LOCAL.get();
try {
// 获取对象的运行时 schema
Schema schema = RuntimeSchema.getSchema(object.getClass());
// 使用 protostuff 将对象转为字节数组
return ProtostuffIOUtil.toByteArray(object, schema, buffer);
} catch (Exception e) {
throw new SerializeException("Protostuff serialize failed.", e);
} finally {
// 使用完 buffer 后清空,避免内存泄漏
buffer.clear();
}
}

@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
Schema<T> schema = RuntimeSchema.getSchema(clazz);
// 创建空对象并填充数据
T object = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, object, schema);
return object;
} catch (Exception e) {
throw new SerializeException("Protostuff deserialize failed.", e);
}
}
}

服务注册与发现

在分布式系统中,不同服务之间应该如何通信呢?传统的方式可以通过 HTTP 请求调用、保存服务端的服务列表等,这样做需要开发者主动感知到服务端暴露的信息,系统之间耦合严重。为了更好地将客户端和服务端解耦,以及实现服务优雅上线和下线,于是注册中心就出现了。在RPC框架中,主要是使用注册中心来实现服务注册和发现的功能。服务端节点上线后自行向注册中心注册服务列表,节点下线时需要从注册中心将节点元数据信息移除。客户端向服务端发起调用时,自己负责从注册中心获取服务端的服务列表,然后在通过负载均衡算法选择其中一个服务节点进行调用。

现在思考一个问题,服务在下线时需要从注册中心移除元数据,那么注册中心怎么才能感知到服务下线呢?我们最先想到的方法就是节点主动通知的实现方式,当节点需要下线时,向注册中心发送下线请求,让注册中心移除自己的元数据信息。但是如果节点异常退出,例如断网、进程崩溃等,那么注册中心将会一直残留异常节点的元数据,从而可能造成服务调用出现问题。

为了避免上述问题,实现服务优雅下线比较好的方式是采用主动通知 + 心跳检测的方案。除了主动通知注册中心下线外,还需要增加节点与注册中心的心跳检测功能,这个过程也叫作探活。心跳检测可以由节点或者注册中心负责,例如注册中心可以向服务节点每 60s 发送一次心跳包,如果 3 次心跳包都没有收到请求结果,可以任务该服务节点已经下线。

采用注册中心的好处是可以解耦客户端和服务端之间错综复杂的关系,并且能够实现对服务的动态管理。服务配置可以支持动态修改,然后将更新后的配置推送到客户端和服务端,无须重启任何服务。

本项目计划实现以Zookeeper为注册中心。

服务注册

关键字段:

// 会话超时时间,客户端在此时间内未与 zk 服务器保持心跳,则认为连接断开
private static final int SESSION_TIMEOUT = 60 * 1000;
// zk 客户端连接 zk 服务器时的连接超时时间
private static final int CONNECT_TIMEOUT = 15 * 1000;
// 超时重试间隔时间,用于指数回退重试策略
private static final int BASE_SLEEP_TIME = 3 * 1000;
// 最大重试次数
private static final int MAX_RETRY = 10;
// zk中所有服务注册的根路径
private static final String BASE_PATH = "/rpc";
// Curator 提供的 zk 客户端对象,封装了 zookeeper 的连接管理、重试、监听等功能
private CuratorFramework client;
// Curator 的服务发现组件,用于查询、注册、注销服务实例
private ServiceDiscovery<ServiceInfo> serviceDiscovery;

构造方法,传入Zookeeper地址并进行初始化:

public ZookeeperServiceRegistry(String registryAddress) {
try {
// 创建zk客户端示例
client = CuratorFrameworkFactory
.newClient(registryAddress, SESSION_TIMEOUT, CONNECT_TIMEOUT,
new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRY));
// 开启客户端通信
client.start();

// 构建 ServiceDiscovery 服务注册中心
serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceInfo.class)
.client(client)
.serializer(new JsonInstanceSerializer<>(ServiceInfo.class))
.basePath(BASE_PATH)
.build();

serviceDiscovery.start();
} catch (Exception e) {
log.error("An error occurred while starting the zookeeper registry: ", e);
}
}

注册方法,将服务注册到zk中:

public void register(ServiceInfo serviceInfo) {
try {
ServiceInstance<ServiceInfo> serviceInstance = ServiceInstance.<ServiceInfo>builder()
.name(serviceInfo.getServiceName())
.address(serviceInfo.getAddress())
.port(serviceInfo.getPort())
.payload(serviceInfo)
.build();
serviceDiscovery.registerService(serviceInstance);
log.info("Successfully registered [{}] service.", serviceInstance.getName());
} catch (Exception e) {
throw new RpcException(String.format("An error occurred when rpc server registering [%s] service.",
serviceInfo.getServiceName()), e);
}
}

服务注销与删除方法,

public void unregister(ServiceInfo serviceInfo) throws Exception {
ServiceInstance<ServiceInfo> serviceInstance = ServiceInstance.<ServiceInfo>builder()
.name(serviceInfo.getServiceName())
.address(serviceInfo.getAddress())
.port(serviceInfo.getPort())
.payload(serviceInfo)
.build();
serviceDiscovery.unregisterService(serviceInstance);
log.warn("Successfully unregistered {} service.", serviceInstance.getName());
}

public void destroy() throws Exception {
serviceDiscovery.close();
client.close();
log.info("Destroy zookeeper registry completed.");
}

服务发现

关键字段:

private static final int SESSION_TIMEOUT = 60 * 1000;

private static final int CONNECT_TIMEOUT = 15 * 1000;

private static final int BASE_SLEEP_TIME = 3 * 1000;

private static final int MAX_RETRY = 10;

private static final String BASE_PATH = "/rpc";
// 负载均衡接口
private LoadBalance loadBalance;

private CuratorFramework client;

private org.apache.curator.x.discovery.ServiceDiscovery<ServiceInfo> serviceDiscovery;
/**
* ServiceCache: 将在zk中的服务数据缓存至本地,并监听服务变化,实时更新缓存
* 服务本地缓存,将服务缓存到本地并增加 watch 事件,当远程服务发生改变时自动更新服务缓存
*/
private final Map<String, ServiceCache<ServiceInfo>> serviceCacheMap = new ConcurrentHashMap<>();
// 将服务列表缓存到本地内存,当服务发生变化时,由 serviceCache 进行服务列表更新操作,当 zk 挂掉时,将保存当前服务列表以便继续提供服务
private final Map<String, List<ServiceInfo>> serviceMap = new ConcurrentHashMap<>();

构造方法,初始化zk客户端和服务发现对象:

public ZookeeperServiceDiscovery(String registryAddress, LoadBalance loadBalance) {
try {
this.loadBalance = loadBalance;

// 创建zk客户端示例
client = CuratorFrameworkFactory
.newClient(registryAddress, SESSION_TIMEOUT, CONNECT_TIMEOUT,
new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRY));
// 开启客户端通信
client.start();

// 构建 ServiceDiscovery 服务注册中心
serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceInfo.class)
.client(client)
.serializer(new JsonInstanceSerializer<>(ServiceInfo.class))
.basePath(BASE_PATH)
.build();
// 开启 服务发现
serviceDiscovery.start();
} catch (Exception e) {
log.error("An error occurred while starting the zookeeper discovery: ", e);
}
}

获取服务列表:

public List<ServiceInfo> getServices(String serviceName) throws Exception {
if (!serviceMap.containsKey(serviceName)) {
// 首次调用时,为该服务名构建一个本地缓存,并开始监听 zk 目录下该服务名节点的变化
ServiceCache<ServiceInfo> serviceCache = serviceDiscovery.serviceCacheBuilder()
.name(serviceName)
.build();
// 添加服务监听,当服务发生变化时主动更新本地缓存并通知
serviceCache.addListener(new ServiceCacheListener() {
@Override
public void cacheChanged() {
log.info("The service [{}] cache has changed. The current number of service samples is {}."
, serviceName, serviceCache.getInstances().size());
// 更新本地缓存的服务列表
serviceMap.put(serviceName, serviceCache.getInstances().stream()
.map(ServiceInstance::getPayload)
.collect(Collectors.toList()));
}

@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
// 当连接状态发生改变时,只打印提示信息,保留本地缓存的服务列表
log.info("The client {} connection status has changed. The current status is: {}."
, client, newState);
}
});
// 开启服务缓存监听
serviceCache.start();
// 将服务缓存对象存入本地
serviceCacheMap.put(serviceName, serviceCache);
// 将服务列表缓存到本地
serviceMap.put(serviceName, serviceCacheMap.get(serviceName).getInstances()
.stream()
.map(ServiceInstance::getPayload)
.collect(Collectors.toList()));
}
return serviceMap.get(serviceName);
}

选择服务实例,根据服务名 + 负载均衡策略选择一个可用的 ServiceInfo 服务实例:

public ServiceInfo discover(RpcRequest request) {
try {
return loadBalance.select(getServices(request.getServiceName()), request);
} catch (Exception e) {
throw new RpcException(String.format("Remote service discovery did not find service %s.",
request.getServiceName()), e);
}
}

清理所有zk相关连接资源,确保优雅下线:

public void destroy() throws Exception {
for (ServiceCache<ServiceInfo> serviceCache : serviceCacheMap.values()) {
if (serviceCache != null) {
serviceCache.close();
}
}
if (serviceDiscovery != null) {
serviceDiscovery.close();
}
if (client != null) {
client.close();
}
}

负载均衡算法

在分布式系统中,服务提供者和服务消费者都会有多台节点,如何保证服务提供者所有节点的负载均衡呢?

客户端在发起调用之前,需要感知有多少服务端节点可用,然后从中选取一个进行调用。客户端需要拿到服务端节点的状态信息,并根据不同的策略实现负载均衡算法

  • Round-Robin轮询:依次轮询服务端节点
  • Weighted Round-Robin 权重轮询:对不同负载水平的服务端节点增加权重系数,这样可以通过权重系数降低性能较差或者配置较低的节点流量
  • Least Connections 最少连接数:客户端根据服务端节点当前的连接数进行负载均衡,客户端会选择连接数最少的一台服务器进行调用。Least Connections策略只是服务端其中一种维度,我们可以演化出最少请求数、CPU利用率最低等其他维度的负载均衡方案
  • Consistent Hash 一致性 Hash:目前主流推荐的负载均衡策略,Consistent Hash 是一种特殊的 Hash 算法,在服务端节点扩容或者下线时,尽可能保证客户端请求还是固定分配到同一台服务器节点。Consistent Hash 算法是采用哈希环来实现的,通过 Hash 函数将对象和服务器节点放置在哈希环上,一般来说服务器可以选择 IP + Port 进行 Hash,然后为对象选择对应的服务器节点,在哈希环中顺时针查找距离对象 Hash 值最近的服务器节点

本项目实现Random、Round-Robin、Consistent Hash三种负载均衡算法

首先编写一个抽象类,定义实现负载均衡的方法:

public abstract class AbstractLoadBalance implements LoadBalance {
@Override
public ServiceInfo select(List<ServiceInfo> invokers, RpcRequest request) {
if (invokers == null || invokers.isEmpty()) {
return null;
}
// 如果服务列表中只有一个服务,无需进行负载均衡,直接返回
if (invokers.size() == 1) {
return invokers.get(0);
}
// 进行负载均衡,由具体的子类实现
return doSelect(invokers, request);
}

/**
* 实现具体负载均衡策略的选择
*
* @param invokers 服务列表
* @param request rpc 请求
* @return 返回服务信息
*/
protected abstract ServiceInfo doSelect(List<ServiceInfo> invokers, RpcRequest request);
}

随机负载均衡

随机选择一个节点即可

public class RandomLoadBalance extends AbstractLoadBalance {

final Random random = new Random();

@Override
protected ServiceInfo doSelect(List<ServiceInfo> invokers, RpcRequest request) {
return invokers.get(random.nextInt(invokers.size()));
}
}

轮询负载均衡

public class RoundRobinLoadBalance extends AbstractLoadBalance {

private static final AtomicInteger atomicInteger = new AtomicInteger(0);

@Override
public ServiceInfo doSelect(List<ServiceInfo> invokers, RpcRequest request) {
return invokers.get(getAndIncrement() % invokers.size());
}

/**
* 返回当前值并加一,通过 CAS 原子更新,当前值到达 {@link Integer#MAX_VALUE} 时,重新设值为 0
* @return 返回当前的值
*/
public final int getAndIncrement() {
int prev, next;
do {
prev = atomicInteger.get();
next = prev == Integer.MAX_VALUE ? 0 : prev + 1;
} while (!atomicInteger.compareAndSet(prev, next)); //把prev更新成next
return prev;
}
}

一致性哈希负载均衡

在分布式系统中,负载均衡是将请求分发到多个服务节点上的关键手段。如果使用简单的哈希方式(比如:hash(key) % 节点数量),当服务节点发生变化(如新增或下线一个服务节点),所有请求的分发都会大范围变化,这会导致:

  • 原本某个 key 缓存在 A 节点,新请求可能被分配到 B,导致缓存失效
  • 用户粘性丧失,会话丢失
  • 系统效率大幅下降

所以我们希望:节点数量变化时,尽量少地影响原有请求的分配规律

一致性哈希算法就是为此设计的,它有两个目标:

  1. 相同的请求key总是路由到同一个节点(请求一致性)
  2. 节点变动时,只有极少数 key被重新分配(低扰动性)

一致性哈希的基本原理

  1. 将哈希空间想象成一个“环”
  • 假设 hash 值范围是 0 ~ 2³²-1,我们可以把它画成一个圆环
  • 所有节点(服务实例)都通过哈希函数映射到这个环上某个位置
  1. 将请求(根据其 key)也映射到环上
  • 请求经过哈希函数也会得到一个 hash 值,在环上的某个点
  1. 如何选择目标节点?
  • 从请求所在的 hash 点开始,顺时针查找,直到找到第一个节点
  • 这个节点就是这个请求应该被分配到的服务节点

举个例子:

假设环上有 3 个节点:

  • A → 哈希值 1000
  • B → 哈希值 4000
  • C → 哈希值 7000

如果请求 key 哈希值为 4200,它将落在 C(7000)上

如果 key 哈希值为 8000,环上没有节点比它大,就从头开始找,第一个是 A(1000),所以分到 A

虚拟节点机制

真实服务节点数量较少(如3~5个),哈希分布不均匀,可能会出现某个节点承载大量请求的情况,导致负载不均

虚拟节点的做法:

  • 每个真实节点被映射成多个虚拟节点(通常是160个)
  • 每个虚拟节点 hash 后也放在环上
  • 请求还是通过 hash 值查找最近的虚拟节点,然后找出它对应的真实节点

效果

  • 请求被均匀地分布到多个虚拟节点上
  • 从而间接实现真实节点的负载均衡

处理节点变更问题

新增节点:

  • 会在环上增加一些新的虚拟节点
  • 新虚拟节点会“截断”一小段原本属于其他节点的请求范围
  • 只有这一小部分请求分配发生变化,其他绝大部分 key 对应的服务节点不变

删除节点:

  • 其虚拟节点从环上移除
  • 这些虚拟节点对应的请求会顺时针路由到下一个节点
  • 同样只影响少部分请求

具体实现

编写静态内部类,实现一致性哈希的核心部分,维护一个虚拟节点环来实现平衡性和低扰动性

private final static class ConsistentHashSelector {
/**
* 使用 TreeMap 存储虚拟节点(virtualInvokers 需要提供高效的查询操作,因此选用 TreeMap 作为存储结构,底层使用红黑树,比HashMap查询速度要快)
*/
private final TreeMap<Long, ServiceInfo> virtualInvokers;

/**
* invokers 的原始哈希码
*/
private final int identityHashCode;

/**
* 构建一个 ConsistentHashSelector 对象
*
* @param invokers 存储虚拟节点
* @param replicaNumber 虚拟节点数,默认为 160
* @param identityHashCode invokers 的原始哈希码
*/
public ConsistentHashSelector(List<ServiceInfo> invokers, int replicaNumber, int identityHashCode) {
this.virtualInvokers = new TreeMap<>();
this.identityHashCode = identityHashCode;

for (ServiceInfo invoker : invokers) {
String address = invoker.getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
// 对 address + i 进行 md5 运算,得到一个长度为16的字节数组
byte[] digest = md5(address + i);
// 对 digest 部分字节进行4次 hash 运算,得到四个不同的 long 型正整数
for (int h = 0; h < 4; h++) {
// h = 0 时,取 digest 中下标为 0 ~ 3 的4个字节进行位运算
// h = 1 时,取 digest 中下标为 4 ~ 7 的4个字节进行位运算
// h = 2, h = 3 时过程同上
long m = hash(digest, h);
// 将 hash 到 invoker 的映射关系存储到 virtualInvokers 中
virtualInvokers.put(m, invoker);
}
}
}
}

/**
* 进行 md5 运算,返回摘要字节数组
* @param key 编码字符串 key
* @return 编码后的摘要内容,长度为 16 的字节数组
*/
private byte[] md5(String key) {
MessageDigest md;
try {
md = MessageDigest.getInstance("MD5");
byte[] bytes = key.getBytes(StandardCharsets.UTF_8);
md.update(bytes);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
return md.digest();
}

/**
* 根据摘要生成 hash 值
* @param digest md5摘要内容
* @param number 当前索引数
* @return hash 值
*/
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}

public ServiceInfo select(String key) {
// 对参数 key 进行 md5 运算
byte[] digest = md5(key);
// 取 digest 数组的前四个字节进行 hash 运算,再将 hash 值传给 selectForKey 方法,
// 寻找合适的 Invoker
return selectForKey(hash(digest, 0));
}

/**
* 得到第一个大于等于 hash 值的服务信息,若没有则返回第一个
*
* @param hash 哈希值
* @return 服务信息
*/
private ServiceInfo selectForKey(long hash) {
// 找到 TreeMap 中查找第一个节点值大于或等于当前 hash 的 Invoker
Map.Entry<Long, ServiceInfo> entry = virtualInvokers.ceilingEntry(hash);
// 如果 hash 大于 Invoker 在圆环上最大的位置,此时 entry = null,需要将 TreeMap 的头节点赋值给 entry
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
}

哈希方法步骤:

  1. 用MD5算法生成摘要:对IP地址+端口号执行MD5,得到长度16字节的摘要
  2. 从摘要中提取出多个32-bit的无符号整数(即虚拟节点哈希值)
    • 每次从 16 字节的摘要中,取出连续的 4 字节
    • 对这四个字节通过位运算拼接成一个32位的整数,代表一个hash值
    • 执行4次,得到4个hash值

具体轮询方法实现:

public ServiceInfo doSelect(List<ServiceInfo> invokers, RpcRequest request) {
// 得到请求的方法名称
String method = request.getMethod();
// 构建对应的 key 值,key = 全限定类名 + "." + 方法名,比如 com.xxx.DemoService.sayHello
String key = request.getServiceName() + "." + method;
// 获取 invokers 原始的 hashCode
int identityHashCode = System.identityHashCode(invokers);
// 从 map 从获取对应的 selector
ConsistentHashSelector selector = selectors.get(key);
// 如果为 null,表示之前没有缓存过,如果 hashcode 不一致,表示缓存的服务列表发生变化
if (selector == null || selector.identityHashCode != identityHashCode) {
// 创建新的 selector 并缓存
selectors.put(key, new ConsistentHashSelector(invokers, 160, identityHashCode));
selector = selectors.get(key);
}
// 调用 ConsistentHashSelector 的 select 方法选择 Invoker
String selectKey = key;
// 将 key 与 方法参数进行 hash 运算,因此 ConsistentHashLoadBalance 的负载均衡逻辑只受参数值影响,
// 具有相同参数值的请求将会被分配给同一个服务提供者。ConsistentHashLoadBalance 不关系权重
if (request.getParameterValues() != null && request.getParameterValues().length > 0) {
selectKey += Arrays.stream(request.getParameterValues());
}
return selector.select(selectKey);
}

根据方法名和参数生成哈希键,从缓存中获取一致性哈希选择器,根据选择器将相同参数请求始终路由到同一个服务节点,实现基于参数的一致性哈希负载均衡

网络通信模块

本项目实现了基于Netty、Http、Socket三种网络通信方式,关于这三种通信方式的区别,我会在另一个文章中说明

Netty通信

首先编写通信处理入口,使用Netty启动TCP服务器监听指定端口:

public class NettyRpcServer implements RpcServer {

@SneakyThrows
@Override
public void start(Integer port) {
// boss 处理 accept 事件
EventLoopGroup boss = new NioEventLoopGroup();
// worker 处理 read/write 事件
EventLoopGroup worker = new NioEventLoopGroup();

try {
InetAddress inetAddress = InetAddress.getLocalHost();

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY, true)
// 是否开启 TCP 底层心跳机制
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.DEBUG))
// 当客户端第一次请求时才会进行初始化
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 30s内没有收到客户端的请求就关闭连接,会触发一个 IdleState#READER_IDLE 事件
ch.pipeline().addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new RpcFrameDecoder());
ch.pipeline().addLast(new SharableRpcMessageCodec());
ch.pipeline().addLast(new NettyRpcRequestHandler());
}
});
// 绑定端口,同步等待绑定成功
ChannelFuture channelFuture = serverBootstrap.bind(inetAddress, port).sync();
log.debug("Rpc server add {} started on the port {}.", inetAddress, port);
// 等待服务端监听端口关闭
channelFuture.channel().closeFuture().sync();
} catch (UnknownHostException | InterruptedException e) {
log.error("An error occurred while starting the rpc service.", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

在handler中我加入了NettyRpcRequestHandler,作为具体的业务实现类:

public class NettyRpcRequestHandler extends SimpleChannelInboundHandler<RpcMessage> {

private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 20, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000));

private final RpcRequestHandler rpcRequestHandler;

public NettyRpcRequestHandler() {
this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcMessage msg) throws Exception {
threadPool.submit(() -> {
try {
RpcMessage responseRpcMessage = new RpcMessage();
MessageHeader header = msg.getHeader();
MessageType type = MessageType.parseByType(header.getMessageType());
log.debug("The message received by the server is: {}", msg.getBody());
// 如果是心跳检测请求信息
if (type == MessageType.HEARTBEAT_REQUEST) {
header.setMessageType(MessageType.HEARTBEAT_RESPONSE.getType());
header.setMessageStatus(MessageStatus.SUCCESS.getCode());
// 设置响应头部信息
responseRpcMessage.setHeader(header);
responseRpcMessage.setBody(ProtocolConstants.PONG);
} else { // 处理 Rpc 请求信息
RpcRequest request = (RpcRequest) msg.getBody();
RpcResponse response = new RpcResponse();
// 设置头部消息类型
header.setMessageType(MessageType.RESPONSE.getType());
// 反射调用
try {
// 获取本地反射调用结果
Object result = rpcRequestHandler.handleRpcRequest(request);
response.setReturnValue(result);
header.setMessageStatus(MessageStatus.SUCCESS.getCode());
} catch (Exception e) {
log.error("The service [{}], the method [{}] invoke failed!", request.getServiceName(), request.getMethod());
// 若不设置,堆栈信息过多,导致报错
response.setExceptionValue(new RpcException("Error in remote procedure call, " + e.getMessage()));
header.setMessageStatus(MessageStatus.FAIL.getCode());
}
// 设置响应头部信息
responseRpcMessage.setHeader(header);
responseRpcMessage.setBody(response);
}
log.debug("responseRpcMessage: {}.", responseRpcMessage);
// 将结果写入,传递到下一个处理器
ctx.writeAndFlush(responseRpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} finally {
// 确保 ByteBuf 被释放,防止发生内存泄露
ReferenceCountUtil.release(msg);
}
});
}

/**
* 自定义事件,当触发读空闲时,自动关闭客户端channel连接,netty自动调用
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
log.warn("idle check happen, so close the connection.");
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("server catch exception");
cause.printStackTrace();
ctx.close();
}
}

Http通信

编写http通信实现类,通过内置Tomcat实现:

public class HttpRpcServer implements RpcServer {

@Override
public void start(Integer port) {
try {
Tomcat tomcat = new Tomcat();

Server server = tomcat.getServer();
Service service = server.findService("Tomcat");

Connector connector = new Connector();
connector.setPort(port);

String hostname = InetAddress.getLocalHost().getHostAddress();

StandardEngine engine = new StandardEngine();
engine.setDefaultHost(hostname);

StandardHost host = new StandardHost();
host.setName(hostname);

String contextPath = "";
Context context = new StandardContext();
context.setPath(contextPath);
context.addLifecycleListener(new Tomcat.FixContextListener());

host.addChild(context);
engine.addChild(host);

service.setContainer(engine);
service.addConnector(connector);

tomcat.addServlet(contextPath, "dispatcher", new DispatcherServlet());
context.addServletMappingDecoded("/*", "dispatcher");

tomcat.start();
tomcat.getServer().await();
} catch (LifecycleException | UnknownHostException e) {
throw new RpcException("Tomcat server failed to start.", e);
}
}
}

Tomcat接收到http请求后,会交给DispatcherServlet类进行统一处理:

public class DispatcherServlet extends HttpServlet {

/**
* 当前处理器数量
*/
private final int cpuNum = Runtime.getRuntime().availableProcessors();

// 创建线程池
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(cpuNum * 2, cpuNum * 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000));

@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
HttpRpcRequestHandler handler = SingletonFactory.getInstance(HttpRpcRequestHandler.class);
threadPool.submit(() -> handler.handle(req, resp));
}
}

然后由DispatcherServlet将请求发给HttpRpcRequestHandler进行实际业务处理:

public class HttpRpcRequestHandler {

private final RpcRequestHandler rpcRequestHandler;

public HttpRpcRequestHandler() {
this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
}

public void handle(HttpServletRequest req, HttpServletResponse resp) {
try {
ObjectInputStream ois = new ObjectInputStream(req.getInputStream());
ObjectOutputStream oos = new ObjectOutputStream(resp.getOutputStream());
// 读取客户端请求
RpcRequest request = (RpcRequest) ois.readObject();
log.debug("The server received message is {}.", request);
// 创建一个 RpcResponse 对象来响应客户端
RpcResponse response = new RpcResponse();
// 处理请求
try {
// 获取请求的服务对应的实例对象反射调用方法的结果
Object result = rpcRequestHandler.handleRpcRequest(request);
response.setReturnValue(result);
} catch (Exception e) {
log.error("The service [{}], the method [{}] invoke failed!", request.getServiceName(), request.getMethod());
// 若不设置,堆栈信息过多,导致报错
response.setExceptionValue(new RpcException("Error in remote procedure call, " + e.getMessage()));
}
log.debug("The response is {}.", response);
oos.writeObject(response);
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("The http server failed to handle client rpc request.", e);
}
}
}

Socket通信

socket通信是最原始的通信方式,首先编写server类:

public class SocketRpcServer implements RpcServer {

/**
* 当前处理器数量
*/
private final int cpuNum = Runtime.getRuntime().availableProcessors();

// 线程大小:这一点要看我们执行的任务是cpu密集型,还是io密集型
// 如果有关于计算机计算,比较消耗资源的是cpu密集型,线程大小应该设置为:cpu 核数 + 1
// 如果有关网络传输,连接数据库等,是io密集型,线程大小应该设置为:cpu * 2
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(cpuNum * 2, cpuNum * 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10000));

@Override
public void start(Integer port) {
try (ServerSocket serverSocket = new ServerSocket()) {
String hostAddress = InetAddress.getLocalHost().getHostAddress();
serverSocket.bind(new InetSocketAddress(hostAddress, port));
Socket socket;
// 循环接受客户端 Socket 连接(accept为阻塞时等待连接)
while ((socket = serverSocket.accept()) != null) {
log.debug("The client connected [{}].", socket.getInetAddress());
threadPool.execute(new SocketRpcRequestHandler(socket));
}
// 服务端连断开,关闭线程池
threadPool.shutdown();
} catch (IOException e) {
throw new RpcException(String.format("The socket server failed to start on port %d.", port), e);
}
}
}

然后是具体的请求处理逻辑,实现Runnable接口,配合线程池执行每一个客户端连接的请求

public class SocketRpcRequestHandler implements Runnable {
private final Socket socket;

private final RpcRequestHandler rpcRequestHandler;

public SocketRpcRequestHandler(Socket socket) {
this.socket = socket;
this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
}

@Override
public void run() {
log.debug("The server handle client message by thread {}.", Thread.currentThread().getName());
try (ObjectInputStream ois = new ObjectInputStream(socket.getInputStream())) {
// 注意:SocketServer 接受和发送的数据为:RpcRequest, RpcResponse
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
// 直接读取客户端发送过来的 RpcRequest,此时不需要进行编解码,无需消息协议
RpcRequest request = (RpcRequest) ois.readObject();
log.debug("The server received message is {}.", request);
// 创建一个 RpcResponse 对象用来响应给客户端
RpcResponse response = new RpcResponse();
// 处理请求
try {
// 获取请求的服务对应的实例对象反射调用方法的结果
Object result = rpcRequestHandler.handleRpcRequest(request);
response.setReturnValue(result);
} catch (Exception e) {
log.error("The service [{}], the method [{}] invoke failed!", request.getServiceName(), request.getMethod());
// 若不设置,堆栈信息过多,导致报错
response.setExceptionValue(new RpcException("Error in remote procedure call, " + e.getMessage()));
}
log.debug("The response is {}.", response);
oos.writeObject(response);
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("The socket server failed to handle client rpc request.", e);
}
}
}

动态代理

RPC框架怎么做到像调用本地接口一样调用远端服务呢?这必须依赖动态代理来实现

需要创建一个代理对象,在代理对象中完成数据报文编码,然后发起调用发送数据给服务提供方,以此屏蔽RPC框架的调用细节。因为代理类是在运行时生成的,所以代理类的生成速度、生成的字节码大小都会影响RPC框架整体的性能和资源消耗

动态代理比较主流的实现方案有以下几种:JDK、Cglib、Javassist、ASM、ByteBuddy

  • JDK:在运行时可以动态创建代理类,但是 JDK 动态代理的功能比较局限,代理对象必须实现一个接口,否则抛出异常。因为代理类会继承Proxy类,然而 Java 是不支持多重继承的,只能通过接口实现多态。JDK 动态代理所生成的代理类是接口的实现类,不能代理接口中不存在的方法。JDK 动态代理是通过反射调用的形式代理类中的方法,比直接调用肯定是性能要慢的
  • Cglib:Cglib是基于ASM字节码生成框架实现的,通过字节码技术生成的代理类,所以代理类的类型是不受限制的。而且Cglib生成的代理类是继承于被代理类,所以可以提供更加灵活的功能。在代理方法方面,Cglib 是有优势的,它采用了 FastClass 机制,为代理类和被代理类各自创建一个 Class,这个 Class会为代理类和被代理类的方法分配 index 索引,FastClass 就可以通过 index 直接定位要调用的方法,并直接调用,这是一种空间换时间的优化思路
  • Javassist和ASM。二者都是Java字节码操作框架,使用起来难度较大,需要开发者对 Class 文件结构以及 JVM 都有所了解,但是它们都比反射的性能要高
  • Byte Buddy 也是一个字节码生成和操作的类库,Byte Buddy 功能强大,相比于 Javassist 和 ASM,Byte Buddy 提供了更加便捷的 API,用于创建和修改 Java 类,无须理解字节码的格式,而且 Byte Buddy 更加轻量,性能更好

本项目实现了JDK和CGLIB动态代理

具体的代理逻辑:

public class ClientStubProxyFactory {

//服务发现中心实现类
private final ServiceDiscovery discovery;

// RpcClient 传输实现类
private final RpcClient rpcClient;

// 客户端配置属性
private final RpcClientProperties properties;

public ClientStubProxyFactory(ServiceDiscovery discovery, RpcClient rpcClient, RpcClientProperties properties) {
this.discovery = discovery;
this.rpcClient = rpcClient;
this.properties = properties;
}

// 代理对象缓存
private static final Map<String, Object> proxyMap = new ConcurrentHashMap<>();

/**
* 获取代理对象
*
* @param clazz 服务接口类型
* @param version 版本号
* @param <T> 代理对象的参数类型
* @return 对应版本的代理对象
*/
public <T> T getProxy(Class<T> clazz, String version) {
return (T) proxyMap.computeIfAbsent(ServiceUtil.serviceKey(clazz.getName(), version), serviceName -> {
// 如果目标类是一个接口或者 是 java.lang.reflect.Proxy 的子类 则默认使用 JDK 动态代理
if (clazz.isInterface() || Proxy.isProxyClass(clazz)) {
return Proxy.newProxyInstance(clazz.getClassLoader(),
new Class[]{clazz}, // 注意,这里的接口是 clazz 本身(要代理的实现类所实现的接口)
new ClientStubInvocationHandler(discovery, rpcClient, properties, serviceName));
} else { // 使用 CGLIB 动态代理
// 创建动态代理增加类
Enhancer enhancer = new Enhancer();
// 设置类加载器
enhancer.setClassLoader(clazz.getClassLoader());
// 设置被代理类
enhancer.setSuperclass(clazz);
// 设置方法拦截器
enhancer.setCallback(new ClientStubMethodInterceptor(discovery, rpcClient, properties, serviceName));
// 创建代理类
return enhancer.create();
}
});
}
}

Netty心跳机制与Channel复用

为了解决每次请求客户端都要重新与服务端建立netty连接,非常耗时,增加心跳检查机制,保持长连接,复用channel连接;

  • 长连接:避免了每次调用新建TCP连接,提高了调用的响应速度
  • Channel 连接复用:避免重复连接服务端
  • 多路复用:单个TCP连接可交替传输多个请求和响应的消息,降低了连接的等待闲置时间,从而减少了同样并发数下的网络连接数,提高了系统吞吐量

多路复用实现

使用一个ConcurrentHashMap存放每个请求的等待结果区,当响应回来时,用sequenceId找到对应的Promise完成结果

public static final Map<Integer, Promise<RpcMessage>> UNPROCESSED_RPC_RESPONSES = new ConcurrentHashMap<>();

拿到已经返回结果的Promise:

protected void channelRead0(ChannelHandlerContext ctx, RpcMessage msg) throws Exception {
try {
MessageType type = MessageType.parseByType(msg.getHeader().getMessageType());
// 如果是 RpcResponse 请求
if (type == MessageType.RESPONSE) {
int sequenceId = msg.getHeader().getSequenceId();
// 拿到还未执行完成的 promise 对象
Promise<RpcMessage> promise = UNPROCESSED_RPC_RESPONSES.remove(sequenceId);
if (promise != null) {
Exception exception = ((RpcResponse) msg.getBody()).getExceptionValue();
if (exception == null) {
promise.setSuccess(msg);
} else {
promise.setFailure(exception);
}
}
} else if (type == MessageType.HEARTBEAT_RESPONSE) { // 如果是心跳检查请求
log.debug("Heartbeat info {}.", msg.getBody());
}
} finally {
// 释放内存,防止内存泄漏
ReferenceCountUtil.release(msg);
}
}

发出请求后,存放还未返回结果的Promise:

public RpcMessage sendRpcRequest(RequestMetadata requestMetadata) {
// 构建接收返回结果的 promise
Promise<RpcMessage> promise;
// 获取 Channel 对象
Channel channel = getChannel(new InetSocketAddress(requestMetadata.getServerAddr(), requestMetadata.getPort()));
if (channel.isActive()) {
// 创建 promise 来接受结果 指定执行完成通知的线程
promise = new DefaultPromise<>(channel.eventLoop());
// 获取请求的序列号 ID
int sequenceId = requestMetadata.getRpcMessage().getHeader().getSequenceId();
// 存入还未处理的请求
RpcResponseHandler.UNPROCESSED_RPC_RESPONSES.put(sequenceId, promise);
// 发送数据并监听发送状态
channel.writeAndFlush(requestMetadata.getRpcMessage()).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.debug("The client send the message successfully, msg: [{}].", requestMetadata);
} else {
future.channel().close();
promise.setFailure(future.cause());
log.error("The client send the message failed.", future.cause());
}
});

Integer timeout = requestMetadata.getTimeout();

// 等待结果返回(让出cpu资源,同步阻塞调用线程main,其他线程去执行获取操作(eventLoop))
// 如果没有指定超时时间,则 await 直到 promise 完成
if (timeout == null || timeout <= 0) {
promise.await();
} else {
// 在指定超时时间内等待结果返回
boolean success = promise.await(requestMetadata.getTimeout(), TimeUnit.MILLISECONDS);
if (!success) {
promise.setFailure(new TimeoutException(String.format("The Remote procedure call exceeded the " +
"specified timeout of %dms.", timeout)));
}
}
if (promise.isSuccess()) {
// 返回响应结果
return promise.getNow();
} else {
throw new RpcException(promise.cause());
}
} else {
throw new IllegalStateException("The channel is inactivate.");
}
}

长连接实现

在client的响应消息处理器中添加自定义时间处理器,当检测到写空闲发生时自动发送一个心跳包:

/**
* 用户自定义事件处理器,处理写空闲,当检测到写空闲发生自动发送一个心跳检测数据包
*
* @param ctx ctx
* @param evt evt
* @throws Exception ex
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) {
log.warn("Write idle happen [{}].", ctx.channel().remoteAddress());
// 构造 心跳检查 RpcMessage
RpcMessage rpcMessage = new RpcMessage();
MessageHeader header = MessageHeader.build(SerializationType.KRYO.name());
header.setMessageType(MessageType.HEARTBEAT_REQUEST.getType());
rpcMessage.setHeader(header);
rpcMessage.setBody(ProtocolConstants.PING);
// 发送心跳检测请求
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}

Channel连接复用

使用ConcurrentHashMap存放每个IP+端口号使用过的Channel,如果当前IP的端口再次发起rpc请求后,检测到之前的Channel还没关闭,就可以再次使用,避免重复连接服务端

public class ChannelProvider {

/**
* 存储 Channel,key 为 ip:port,val 为 channel 对象
*/
private final Map<String, Channel> channels = new ConcurrentHashMap<>();

public Channel get(String hostname, Integer port) {
String key = hostname + ":" + port;
// 如果之前对应的 ip port 已经建立了 channel
if (channels.containsKey(key)) {
// 取出 channel
Channel channel = channels.get(key);
// 如果 channel 不为 null,并且处于活跃状态(连接状态)
if (channel != null && channel.isActive()) {
return channel;
} else {
// 为 null 或者已经关闭连接,从 map 中移除
channels.remove(key);
}
}
return null;
}

public Channel get(InetSocketAddress inetSocketAddress) {
return get(inetSocketAddress.getHostName(), inetSocketAddress.getPort());
}

public void set(String hostname, Integer port, Channel channel) {
String key = hostname + ":" + port;
channels.put(key, channel);
}

public void set(InetSocketAddress inetSocketAddress, Channel channel) {
this.set(inetSocketAddress.getHostName(), inetSocketAddress.getPort(), channel);
}
}

集成Spring自定义注解

首先编写RpcService注解类,提供服务接口、接口名和版本等元数据,供框架进行服务发现、注册和远程调用时使用

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RpcService {

/**
* 对外暴露服务的接口类型,默认为 void.class
*/
Class<?> interfaceClass() default void.class;

/**
* 对外暴露服务的接口名(全限定名),默认为 ""
*/
String interfaceName() default "";

/**
* 版本号,默认 1.0
*/
String version() default "1.0";
}

然后编写Spring扩展机制中的一个实现类,扫描被@RpcService标注的组件并将对应的BeanDefiniton对象注册到Spring

public class RpcBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {

private ResourceLoader resourceLoader;

@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}

/**
* 此方法会在 spring 自定义扫描执行之后执行,这个时候 beanDefinitionMap 已经有扫描到的 beanDefinition 对象了
*
* @param annotationMetadata annotation metadata of the importing class
* @param registry current bean definition registry
*/
@Override
public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry registry) {
// 获取 RpcComponentScan 注解的属性和值
AnnotationAttributes annotationAttributes = AnnotationAttributes
.fromMap(annotationMetadata.getAnnotationAttributes(RpcComponentScan.class.getName()));
String[] basePackages = {};
if (annotationAttributes != null) {
// 此处去获取RpcComponentScan 注解的 basePackages 值
basePackages = annotationAttributes.getStringArray("basePackages");
}
// 如果没有指定名称的话,默认就用当前类所在包
if (basePackages.length == 0) {
basePackages = new String[]{((StandardAnnotationMetadata) annotationMetadata).getIntrospectedClass().getPackage().getName()};
}
// 创建一个浏览 RpcService 注解的 Scanner
RpcClassPathBeanDefinitionScanner rpcServiceScanner = new RpcClassPathBeanDefinitionScanner(registry, RpcService.class);

if (this.resourceLoader != null) {
rpcServiceScanner.setResourceLoader(this.resourceLoader);
}

// 扫描包下的所有 Rpc bean 并返回注册成功的数量(scan方法会调用register方法去注册扫描到的类并生成 BeanDefinition 注册到 spring 容器)
int count = rpcServiceScanner.scan(basePackages);
log.info("The number of BeanDefinition scanned and registered by RpcServiceScanner is {}.", count);
}
}

编写RpcComponentScan注解类,调用上述具体实现类:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(RpcBeanDefinitionRegistrar.class)
public @interface RpcComponentScan {

/**
* 扫描包路径
*/
@AliasFor("basePackages")
String[] value() default {};

/**
* 扫描包路径
*/
@AliasFor("value")
String[] basePackages() default {};
}

现在经过调用链,服务已经被注册到Spring中了,接下来利用Spring框架自动将服务信息注册到注册中心:

public class RpcServerBeanPostProcessor implements BeanPostProcessor, CommandLineRunner {

private final ServiceRegistry serviceRegistry;

private final RpcServer rpcServer;

private final RpcServerProperties properties;

public RpcServerBeanPostProcessor(ServiceRegistry serviceRegistry, RpcServer rpcServer, RpcServerProperties properties) {
this.serviceRegistry = serviceRegistry;
this.rpcServer = rpcServer;
this.properties = properties;
}

/**
* 在 bean 实例化后,初始化后,检测标注有 @RpcService 注解的类,将对应的服务类进行注册,对外暴露服务,同时进行本地服务注册
*
* @param bean bean
* @param beanName beanName
* @return 返回增强后的 bean
* @throws BeansException Bean 异常
*/
@SneakyThrows
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 判断当前 bean 是否被 @RpcService 注解标注
if (bean.getClass().isAnnotationPresent(RpcService.class)) {
log.info("[{}] is annotated with [{}].", bean.getClass().getName(), RpcService.class.getCanonicalName());
// 获取到该类的 @RpcService 注解
RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
String interfaceName;
if ("".equals(rpcService.interfaceName())) {
interfaceName = rpcService.interfaceClass().getName();
} else {
interfaceName = rpcService.interfaceName();
}
String version = rpcService.version();
String serviceName = ServiceUtil.serviceKey(interfaceName, version);
// 构建 ServiceInfo 对象
ServiceInfo serviceInfo = ServiceInfo.builder()
.appName(properties.getAppName())
.serviceName(serviceName)
.version(version)
.address(properties.getAddress())
.port(properties.getPort())
.build();
// 进行远程服务注册
serviceRegistry.register(serviceInfo);
// 进行本地服务缓存注册
LocalServiceCache.addService(serviceName, bean);
}
return bean;
}

/**
* 开机自启动 - 此方法实现于 {@link CommandLineRunner} 接口,基于 springboot
*
* @param args incoming main method arguments 命令行参数
* @throws Exception 启动异常
*/
@Override
public void run(String... args) throws Exception {
new Thread(() -> rpcServer.start(properties.getPort())).start();
log.info("Rpc server [{}] start, the appName is {}, the port is {}",
rpcServer, properties.getAppName(), properties.getPort());
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
// 当服务关闭之后,将服务从 注册中心 上清除(关闭连接)
serviceRegistry.destroy();
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
}
}

上述两个注解是服务提供方的,下面实现服务调用方的注解RpcReference,用于标记需要动态代理生成远程服务调用客户端的字段或方法

@Target({ElementType.FIELD, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RpcReference {

/**
* 对外暴露服务的接口类型,默认为 void.class
*/
Class<?> interfaceClass() default void.class;

/**
* 对外暴露服务的接口名(全限定名),默认为 ""
*/
String interfaceName() default "";

/**
* 版本号,默认 1.0
*/
String version() default "1.0";

/**
* 负载均衡策略,合法的值包括:random, roundrobin, leastactive
*/
String loadbalance() default "";

/**
* Service mock name, use interface name + Mock if not set
*/
String mock() default "";

/**
* 服务调用超时时间
*/
int timeout() default 0;
}

然后编写打上注解标签后的实现细节,在Spring完成Bean实例化之后,扫描Bean中标注了@RpcReference注解的字段,并将这些字段替换为对应的客户端代理对象:

public class RpcClientBeanPostProcessor implements BeanPostProcessor {

private final ClientStubProxyFactory proxyFactory;

public RpcClientBeanPostProcessor(ClientStubProxyFactory proxyFactory) {
this.proxyFactory = proxyFactory;
}

/**
* 在bean实例化完后,扫描bean中需要进行rpc注入的属性,将对应的属性使用代理对象进行替换
*
* @param bean bean 对象
* @param beanName bean 名称
* @return 后置增强后的 bean 对象
* @throws BeansException bean 异常
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 获取该 bean 的类的所有属性(getFields - 获取所有的public属性,getDeclaredFields - 获取所有声明的属性,不区分访问修饰符)
Field[] fields = bean.getClass().getDeclaredFields();
// 遍历所有属性
for (Field field : fields) {
// 判断是否被 RpcReference 注解标注
if (field.isAnnotationPresent(RpcReference.class)) {
// 获得 RpcReference 注解
RpcReference rpcReference = field.getAnnotation(RpcReference.class);
// 默认类为属性当前类型
// filed.class = java.lang.reflect.Field
// filed.type = com.wxy.xxx.service.XxxService
Class<?> clazz = field.getType();
try {
// 如果指定了全限定类型接口名
if (!"".equals(rpcReference.interfaceName())) {
clazz = Class.forName(rpcReference.interfaceName());
}
// 如果指定了接口类型
if (rpcReference.interfaceClass() != void.class) {
clazz = rpcReference.interfaceClass();
}
// 获取指定类型的代理对象
Object proxy = proxyFactory.getProxy(clazz, rpcReference.version());
// 关闭安全检查
field.setAccessible(true);
// 设置域的值为代理对象
field.set(bean, proxy);
} catch (ClassNotFoundException | IllegalAccessException e) {
throw new RpcException(String.format("Failed to obtain proxy object, the type of field %s is %s, " +
"and the specified loaded proxy type is %s.", field.getName(), field.getClass(), clazz), e);
}
}
}
return bean;
}
}

集成SpringBoot实现自动装配

编写对应的自动配置的配置类以及 spring.factories 文件,引入对应的starter即可完成自动配置功能。

server端的配置类,主要作用有

  • 将服务注册到注册中心
  • 启动RPC服务,监听请求连接
  • 开启Bean后处理器
@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcServerAutoConfiguration {

@Autowired
RpcServerProperties properties;

/**
* 创建 ServiceRegistry 实例 bean,当没有配置时默认使用 zookeeper 作为配置中心
*/
@Bean(name = "serviceRegistry")
@Primary
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.server", name = "registry", havingValue = "zookeeper", matchIfMissing = true)
public ServiceRegistry zookeeperServiceRegistry() {
return new ZookeeperServiceRegistry(properties.getRegistryAddr());
}

@Bean(name = "serviceRegistry")
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.server", name = "registry", havingValue = "nacos")
public ServiceRegistry nacosServiceRegistry() {
return new NacosServiceRegistry(properties.getRegistryAddr());
}

// 当没有配置通信协议属性时,默认使用 netty 作为通讯协议
@Bean(name = "rpcServer")
@Primary
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.server", name = "transport", havingValue = "netty", matchIfMissing = true)
public RpcServer nettyRpcServer() {
return new NettyRpcServer();
}

@Bean(name = "rpcServer")
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.server", name = "transport", havingValue = "http")
@ConditionalOnClass(name = {"org.apache.catalina.startup.Tomcat"})
public RpcServer httpRpcServer() {
return new HttpRpcServer();
}

@Bean(name = "rpcServer")
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.server", name = "transport", havingValue = "socket")
public RpcServer socketRpcServer() {
return new SocketRpcServer();
}

@Bean
@ConditionalOnMissingBean
@ConditionalOnBean({ServiceRegistry.class, RpcServer.class})
public RpcServerBeanPostProcessor rpcServerBeanPostProcessor(@Autowired ServiceRegistry serviceRegistry,
@Autowired RpcServer rpcServer,
@Autowired RpcServerProperties properties) {

return new RpcServerBeanPostProcessor(serviceRegistry, rpcServer, properties);
}
}

client的自动配置类,主要作用有:

  • 配置负载均衡算法
  • 服务发现
  • 客户端网络通信,用来发送和接收响应
  • 动态代理类
  • 开启Bean后处理器
  • 退出时清理组件
@Configuration
@EnableConfigurationProperties(RpcClientProperties.class)
public class RpcClientAutoConfiguration {

/**
* 属性绑定的实现方式二:
* - 创建 RpcClientProperties 对象,绑定到配置文件
* - 如果使用此方法,可以直接给属性赋初始值
*
* @param environment 当前应用的环境(支持 yaml、properties 等文件格式)
* @return 返回对应的绑定属性类
* @deprecated 弃用,使用被 {@link org.springframework.boot.context.properties.ConfigurationProperties} 标注的属性类代替,
* 生成 metadata。
*/
// @Bean
@Deprecated
public RpcClientProperties rpcClientProperties(Environment environment) {
// 获取绑定器,将对应的属性绑定到指定类上
BindResult<RpcClientProperties> bind = Binder.get(environment).bind("rpc.client", RpcClientProperties.class);
// 获取实例
return bind.get();
}

@Autowired
RpcClientProperties rpcClientProperties;

@Bean(name = "loadBalance")
@Primary
@ConditionalOnMissingBean // 不指定 value 则值默认为当前创建的类
@ConditionalOnProperty(prefix = "rpc.client", name = "loadbalance", havingValue = "random", matchIfMissing = true)
public LoadBalance randomLoadBalance() {
return new RandomLoadBalance();
}

@Bean(name = "loadBalance")
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.client", name = "loadbalance", havingValue = "roundRobin")
public LoadBalance roundRobinLoadBalance() {
return new RoundRobinLoadBalance();
}

@Bean(name = "loadBalance")
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.client", name = "loadbalance", havingValue = "consistentHash")
public LoadBalance consistentHashLoadBalance() {
return new ConsistentHashLoadBalance();
}

@Bean(name = "serviceDiscovery")
@Primary
@ConditionalOnMissingBean
@ConditionalOnBean(LoadBalance.class)
@ConditionalOnProperty(prefix = "rpc.client", name = "registry", havingValue = "zookeeper", matchIfMissing = true)
public ServiceDiscovery zookeeperServiceDiscovery(@Autowired LoadBalance loadBalance) {
return new ZookeeperServiceDiscovery(rpcClientProperties.getRegistryAddr(), loadBalance);
}

@Bean(name = "serviceDiscovery")
@ConditionalOnMissingBean
@ConditionalOnBean(LoadBalance.class)
@ConditionalOnProperty(prefix = "rpc.client", name = "registry", havingValue = "nacos")
public ServiceDiscovery nacosServiceDiscovery(@Autowired LoadBalance loadBalance) {
return new NacosServiceDiscovery(rpcClientProperties.getRegistryAddr(), loadBalance);
}

@Bean(name = "rpcClient")
@Primary
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.client", name = "transport", havingValue = "netty", matchIfMissing = true)
public RpcClient nettyRpcClient() {
return new NettyRpcClient();
}

@Bean(name = "rpcClient")
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.client", name = "transport", havingValue = "http")
public RpcClient httpRpcClient() {
return new HttpRpcClient();
}

@Bean(name = "rpcClient")
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "rpc.client", name = "transport", havingValue = "socket")
public RpcClient socketRpcClient() {
return new SocketRpcClient();
}

@Bean
@ConditionalOnMissingBean
@ConditionalOnBean({ServiceDiscovery.class, RpcClient.class})
public ClientStubProxyFactory clientStubProxyFactory(@Autowired ServiceDiscovery serviceDiscovery,
@Autowired RpcClient rpcClient,
@Autowired RpcClientProperties rpcClientProperties) {
return new ClientStubProxyFactory(serviceDiscovery, rpcClient, rpcClientProperties);
}

@Bean
@ConditionalOnMissingBean
public RpcClientBeanPostProcessor rpcClientBeanPostProcessor(@Autowired ClientStubProxyFactory clientStubProxyFactory) {
return new RpcClientBeanPostProcessor(clientStubProxyFactory);
}

@Bean
@ConditionalOnMissingBean
public RpcClientExitDisposableBean rpcClientExitDisposableBean(@Autowired ServiceDiscovery serviceDiscovery) {
return new RpcClientExitDisposableBean(serviceDiscovery);
}
}