RPC又称远程过程调用(Remote Procedure Call),用于解决分布式系统中服务之间的调用问题。通俗地讲,就是开发者能够像调用本地方法一样调用远程的服务。
RPC框架一般必须包含三个组件,分别是客户端、服务端 以及注册中心 ,一次完整的RPC调用流程一般为:
服务端启动服务后,将他提供的服务列表发布到注册中心(服务注册)
客户端会向注册中心订阅相关的服务地址(服务订阅)
客户端通常会利用本地代理模块 Proxy 向服务端发起远程过程调用,Proxy 负责将调用的方法、参数等数据转化为网络字节流
客户端从服务列表中根据负载均衡策略选择一个服务地址,并将数据通过网络发送给服务端
服务端得到数据后,调用对应的服务,然后将结果通过网络返回给客户端
虽然 RPC 调用流程很容易理解,但是实现一个完整的 RPC 框架设计到很多内容,例如服务注册与发现、通信协议与序列化、负载均衡、动态代理等
项目目标
实现基于Netty/Socket/Http三种方式进行网路通信
自定义消息协议,编解码器
五种序列化算法(JDK、JSON、HESSIAN、KRYO、PROTOSTUFF)
三种负载均衡算法(RoundRobin、Random、ConsistentHash)
两种动态代理(JDK、CGLIB)
基于 Zookeeper 的服务注册与发现,增加服务本地缓存与监听
集成 Spring,自定义注解提供 RPC 组件扫描、服务注册、服务消费
集成 SpringBoot,完成自动配置
增加 Netty 心跳机制,复用 Channel 连接
实现自定义 SPI 机制
编解码模块 粘包半包解决 这里采取的是消息长度 + 消息内容来解决的此问题,将每一条消息分为header和body,header中包含body的长度
public class RpcFrameDecoder extends LengthFieldBasedFrameDecoder {k RpcFrameDecoder#RpcFrameDecoder(int , int , int )} public RpcFrameDecoder () { this (16384 , 12 , 4 ); } public RpcFrameDecoder (int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) { super (maxFrameLength, lengthFieldOffset, lengthFieldLength); } }
注意这里我给定的消息体最大长度是16384Byte,即16KB,如果消息体长度比这个要大,就会抛出异常,无法接收
Netty官方建议不要轻易超过 8MB,否则可能导致内存过度分配、内存攻击等问题
编解码实现 编码部分是出站处理,需要编写RpcMessage模块,即header和body,header是自定义协议:
--------------------------------------------------------------------- | 魔数 (4byte) | 版本号 (1byte) | 序列化算法 (1byte) | 消息类型 (1byte) | ------------------------------------------------------------------- | 状态类型 (1byte) | 消息序列号 (4byte) | 消息长度 (4byte) | --------------------------------------------------------------------- | 消息内容 (不固定) | ---------------------------------------------------------------------
共16字节,在粘包半包解码器中定义了偏移量。
body部分使用序列化算法进行序列化,然后向下传递。
解码部分,先验证魔数是否符合要求,然后再进行后续的解码,在使用反序列化算法解析body部分获取消息体,然后向下传递。
@Sharable public class SharableRpcMessageCodec extends MessageToMessageCodec <ByteBuf, RpcMessage> { @Override protected void encode (ChannelHandlerContext ctx, RpcMessage msg, List<Object> out) throws Exception { ByteBuf buf = ctx.alloc().buffer(); MessageHeader header = msg.getHeader(); buf.writeBytes(header.getMagicNum()); buf.writeByte(header.getVersion()); buf.writeByte(header.getSerializerType()); buf.writeByte(header.getMessageType()); buf.writeByte(header.getMessageStatus()); buf.writeInt(header.getSequenceId()); Object body = msg.getBody(); Serialization serialization = SerializationFactory .getSerialization(SerializationType.parseByType(header.getSerializerType())); byte [] bytes = serialization.serialize(body); header.setLength(bytes.length); buf.writeInt(header.getLength()); buf.writeBytes(bytes); out.add(buf); } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { int len = ProtocolConstants.MAGIC_NUM.length; byte [] magicNum = new byte [len]; msg.readBytes(magicNum, 0 , len); for (int i = 0 ; i < len; i++) { if (magicNum[i] != ProtocolConstants.MAGIC_NUM[i]) { throw new IllegalArgumentException ("Unknown magic code: " + Arrays.toString(magicNum)); } } byte version = msg.readByte(); if (version != ProtocolConstants.VERSION) { throw new IllegalArgumentException ("The version isn't compatible " + version); } byte serializeType = msg.readByte(); byte messageType = msg.readByte(); byte messageStatus = msg.readByte(); int sequenceId = msg.readInt(); int length = msg.readInt(); byte [] bytes = new byte [length]; msg.readBytes(bytes, 0 , length); MessageHeader header = MessageHeader.builder() .magicNum(magicNum) .version(version) .serializerType(serializeType) .messageType(messageType) .sequenceId(sequenceId) .messageStatus(messageStatus) .length(length).build(); Serialization serialization = SerializationFactory .getSerialization(SerializationType.parseByType(serializeType)); MessageType type = MessageType.parseByType(messageType); RpcMessage protocol = new RpcMessage (); protocol.setHeader(header); if (type == MessageType.REQUEST) { RpcRequest request = serialization.deserialize(RpcRequest.class, bytes); protocol.setBody(request); } else if (type == MessageType.RESPONSE) { RpcResponse response = serialization.deserialize(RpcResponse.class, bytes); protocol.setBody(response); } else if (type == MessageType.HEARTBEAT_REQUEST || type == MessageType.HEARTBEAT_RESPONSE) { String message = serialization.deserialize(String.class, bytes); protocol.setBody(message); } out.add(protocol); } }
序列化算法 客户端和服务端在通信过程中肯定要传输数据,但是这些数据不可能是直接明文传输的,我们需要对数据进行编码,那么该如何编解码呢?
如果采用TCP协议,我们需要将调用的接口、方法、请求参数、调用属性等信息序列化成二进制字节流传递给服务提供方,服务端接收到数据后,再把二进制字节流反序列化得到调用信息,然后利用反射的原理调用对应方法,最后将返回结果、返回码、异常信息等返回给客户端。所谓序列化和反序列化就是将对象转换成二进制流以及将二进制流再转换成对象的过程。因为网络通信依赖于字节流,而且这些请求信息都是不确定的,所以一般会选用通用且高效的序列化算法。比较常用的序列化算法有FastJson、Kryo、Hessian、Protobuf
等,这些第三方序列化算法都比Java原生的序列化操作都更加高效
Dubbo支持多种序列化算法,并定义了Serialization接口规范,所有序列化算法扩展都必须实现该接口,其中默认使用的是Hessian
序列化算法
序列化对于远程调用的响应速度、吞吐量、网络带宽消耗等同样也起着至关重要的作用,是提升分布式系统性能的最关键因素之一
判断一个编码框架的优劣主要从以下几个方面:
是否支持跨语言,支持语种是否丰富
编码后的码流
编解码的性能
类库是否小巧,API使用是否方便
使用者开发的工作量和难度
本项目计划实现5种序列化算法,分别为:JDK、JSON、HESSIAN、KRYO 、PROTOSTUFF ,其中JSON使用GSON实现
五种序列化算法的比较如下:
序列化算法
优点
缺点
Kryo
速度快,序列化后体积小
跨语言支持较复杂
Hessian
默认支持跨语言
较慢
Protostuff
速度快,基于protobuf
需静态编译
Json
使用方便
性能一般
Jdk
使用方便,可序列化所有类
速度慢,占空间
Hessian序列化 public class HessianSerialization implements Serialization { @Override public <T> byte [] serialize(T object) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream (); HessianSerializerOutput hso = new HessianSerializerOutput (baos); hso.writeObject(object); hso.flush(); return baos.toByteArray(); } catch (IOException e) { throw new SerializeException ("Hessian serialize failed." , e); } } @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { try { ByteArrayInputStream bis = new ByteArrayInputStream (bytes); HessianSerializerInput hsi = new HessianSerializerInput (bis); return (T) hsi.readObject(); } catch (IOException e) { throw new SerializeException ("Hessian deserialize failed." , e); } } }
Jdk序列化 public class JdkSerialization implements Serialization { @Override public <T> byte [] serialize(T object) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream (); ObjectOutputStream oos = new ObjectOutputStream (baos); oos.writeObject(object); return baos.toByteArray(); } catch (IOException e) { throw new SerializeException ("Jdk serialize failed." , e); } } @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { try { ByteArrayInputStream bais = new ByteArrayInputStream (bytes); ObjectInputStream ois = new ObjectInputStream (bais); return (T) ois.readObject(); } catch (IOException | ClassNotFoundException e) { throw new SerializeException ("Jdk deserialize failed." , e); } } }
Json序列化 由于Gson默认不能序列化Class对象,因为Class没有默认的序列化方案,举个例子,我们的RPCRequest类是这样的:
@Data public class RpcRequest implements Serializable { private String serviceName; private String method; private Class<?>[] parameterTypes; private Object[] parameterValues; }
假设我们要远程调用的方法如下:
public User getUser (String id, int level)
那我们就会创建这个RpcRequest
:
RpcRequest request = new RpcRequest ();request.setMethod("getUser" ); request.setParameterTypes(new Class <?>[]{String.class, int .class}); request.setParameterValues(new Object []{"abc123" , 5 });
如果我们要使用Gson序列化这个类时,就可以自己注册一个 JsonSerializer<Class<?>>
和 JsonDeserializer<Class<?>>
,完全实现:
public class JsonSerialization implements Serialization { static class ClassCodec implements JsonSerializer <Class<?>>, JsonDeserializer<Class<?>> { @SneakyThrows @Override public Class<?> deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { String name = json.getAsString(); return Class.forName(name); } @Override public JsonElement serialize (Class<?> src, Type typeOfSrc, JsonSerializationContext context) { return new JsonPrimitive (src.getName()); } } @Override public <T> byte [] serialize(T object) { try { Gson gson = new GsonBuilder ().registerTypeAdapter(Class.class, new ClassCodec ()).create(); String json = gson.toJson(object); return json.getBytes(StandardCharsets.UTF_8); } catch (Exception e) { throw new SerializeException ("Json serialize failed." , e); } } @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { try { Gson gson = new GsonBuilder ().registerTypeAdapter(Class.class, new ClassCodec ()).create(); String json = new String (bytes, StandardCharsets.UTF_8); return gson.fromJson(json, clazz); } catch (JsonSyntaxException e) { throw new SerializeException ("Json deserialize failed." , e); } } }
Kryo序列化 为什么Kryo线程不安全?其他序列化方法为什么线程安全?这个问题我们之后单独开一个帖子说~
public class KryoSerialization implements Serialization { private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> { Kryo kryo = new Kryo (); kryo.register(RpcRequest.class); kryo.register(RpcResponse.class); return kryo; }); @Override public <T> byte [] serialize(T object) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream (); Output output = new Output (baos); Kryo kryo = kryoThreadLocal.get(); kryo.writeObject(output, object); kryoThreadLocal.remove(); return output.toBytes(); } catch (Exception e) { throw new SerializeException ("Kryo serialize failed." , e); } } @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { try { ByteArrayInputStream bais = new ByteArrayInputStream (bytes); Input input = new Input (bais); Kryo kryo = kryoThreadLocal.get(); T object = kryo.readObject(input, clazz); kryoThreadLocal.remove(); return object; } catch (Exception e) { throw new SerializeException ("Kryo deserialize failed." , e); } } }
Protostuff序列化 Protostuff需要用一个LinkedBuffer
作为临时写入区域,项目中提前分配一个共享的 BUFFER
,避免每次都重新分配,提高性能
public class ProtostuffSerialization implements Serialization { private static final ThreadLocal<LinkedBuffer> BUFFER_THREAD_LOCAL = ThreadLocal.withInitial(() -> LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE)); @SuppressWarnings({"unchecked", "rawtypes"}) @Override public <T> byte [] serialize(T object) { LinkedBuffer buffer = BUFFER_THREAD_LOCAL.get(); try { Schema schema = RuntimeSchema.getSchema(object.getClass()); return ProtostuffIOUtil.toByteArray(object, schema, buffer); } catch (Exception e) { throw new SerializeException ("Protostuff serialize failed." , e); } finally { buffer.clear(); } } @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { try { Schema<T> schema = RuntimeSchema.getSchema(clazz); T object = schema.newMessage(); ProtostuffIOUtil.mergeFrom(bytes, object, schema); return object; } catch (Exception e) { throw new SerializeException ("Protostuff deserialize failed." , e); } } }
服务注册与发现 在分布式系统中,不同服务之间应该如何通信呢?传统的方式可以通过 HTTP 请求调用、保存服务端的服务列表等,这样做需要开发者主动感知到服务端暴露的信息,系统之间耦合严重。为了更好地将客户端和服务端解耦,以及实现服务优雅上线和下线,于是注册中心就出现了。在RPC框架中,主要是使用注册中心来实现服务注册和发现的功能。服务端节点上线后自行向注册中心注册服务列表,节点下线时需要从注册中心将节点元数据信息移除。客户端向服务端发起调用时,自己负责从注册中心获取服务端的服务列表,然后在通过负载均衡算法选择其中一个服务节点进行调用。
现在思考一个问题,服务在下线时需要从注册中心移除元数据,那么注册中心怎么才能感知到服务下线呢?我们最先想到的方法就是节点主动通知的实现方式,当节点需要下线时,向注册中心发送下线请求,让注册中心移除自己的元数据信息。但是如果节点异常退出,例如断网、进程崩溃等,那么注册中心将会一直残留异常节点的元数据,从而可能造成服务调用出现问题。
为了避免上述问题,实现服务优雅下线比较好的方式是采用主动通知 + 心跳检测的方案。除了主动通知注册中心下线外,还需要增加节点与注册中心的心跳检测功能,这个过程也叫作探活。心跳检测可以由节点或者注册中心负责,例如注册中心可以向服务节点每 60s 发送一次心跳包,如果 3 次心跳包都没有收到请求结果,可以任务该服务节点已经下线。
采用注册中心的好处是可以解耦客户端和服务端之间错综复杂的关系,并且能够实现对服务的动态管理。服务配置可以支持动态修改,然后将更新后的配置推送到客户端和服务端,无须重启任何服务。
本项目计划实现以Zookeeper为注册中心。
服务注册 关键字段:
private static final int SESSION_TIMEOUT = 60 * 1000 ;private static final int CONNECT_TIMEOUT = 15 * 1000 ;private static final int BASE_SLEEP_TIME = 3 * 1000 ;private static final int MAX_RETRY = 10 ;private static final String BASE_PATH = "/rpc" ;private CuratorFramework client;private ServiceDiscovery<ServiceInfo> serviceDiscovery;
构造方法,传入Zookeeper地址并进行初始化:
public ZookeeperServiceRegistry (String registryAddress) { try { client = CuratorFrameworkFactory .newClient(registryAddress, SESSION_TIMEOUT, CONNECT_TIMEOUT, new ExponentialBackoffRetry (BASE_SLEEP_TIME, MAX_RETRY)); client.start(); serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceInfo.class) .client(client) .serializer(new JsonInstanceSerializer <>(ServiceInfo.class)) .basePath(BASE_PATH) .build(); serviceDiscovery.start(); } catch (Exception e) { log.error("An error occurred while starting the zookeeper registry: " , e); } }
注册方法,将服务注册到zk中:
public void register (ServiceInfo serviceInfo) { try { ServiceInstance<ServiceInfo> serviceInstance = ServiceInstance.<ServiceInfo>builder() .name(serviceInfo.getServiceName()) .address(serviceInfo.getAddress()) .port(serviceInfo.getPort()) .payload(serviceInfo) .build(); serviceDiscovery.registerService(serviceInstance); log.info("Successfully registered [{}] service." , serviceInstance.getName()); } catch (Exception e) { throw new RpcException (String.format("An error occurred when rpc server registering [%s] service." , serviceInfo.getServiceName()), e); } }
服务注销与删除方法,
public void unregister (ServiceInfo serviceInfo) throws Exception { ServiceInstance<ServiceInfo> serviceInstance = ServiceInstance.<ServiceInfo>builder() .name(serviceInfo.getServiceName()) .address(serviceInfo.getAddress()) .port(serviceInfo.getPort()) .payload(serviceInfo) .build(); serviceDiscovery.unregisterService(serviceInstance); log.warn("Successfully unregistered {} service." , serviceInstance.getName()); } public void destroy () throws Exception { serviceDiscovery.close(); client.close(); log.info("Destroy zookeeper registry completed." ); }
服务发现 关键字段:
private static final int SESSION_TIMEOUT = 60 * 1000 ;private static final int CONNECT_TIMEOUT = 15 * 1000 ;private static final int BASE_SLEEP_TIME = 3 * 1000 ;private static final int MAX_RETRY = 10 ;private static final String BASE_PATH = "/rpc" ;private LoadBalance loadBalance;private CuratorFramework client;private org.apache.curator.x.discovery.ServiceDiscovery<ServiceInfo> serviceDiscovery;private final Map<String, ServiceCache<ServiceInfo>> serviceCacheMap = new ConcurrentHashMap <>();private final Map<String, List<ServiceInfo>> serviceMap = new ConcurrentHashMap <>();
构造方法,初始化zk客户端和服务发现对象:
public ZookeeperServiceDiscovery (String registryAddress, LoadBalance loadBalance) { try { this .loadBalance = loadBalance; client = CuratorFrameworkFactory .newClient(registryAddress, SESSION_TIMEOUT, CONNECT_TIMEOUT, new ExponentialBackoffRetry (BASE_SLEEP_TIME, MAX_RETRY)); client.start(); serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceInfo.class) .client(client) .serializer(new JsonInstanceSerializer <>(ServiceInfo.class)) .basePath(BASE_PATH) .build(); serviceDiscovery.start(); } catch (Exception e) { log.error("An error occurred while starting the zookeeper discovery: " , e); } }
获取服务列表:
public List<ServiceInfo> getServices (String serviceName) throws Exception { if (!serviceMap.containsKey(serviceName)) { ServiceCache<ServiceInfo> serviceCache = serviceDiscovery.serviceCacheBuilder() .name(serviceName) .build(); serviceCache.addListener(new ServiceCacheListener () { @Override public void cacheChanged () { log.info("The service [{}] cache has changed. The current number of service samples is {}." , serviceName, serviceCache.getInstances().size()); serviceMap.put(serviceName, serviceCache.getInstances().stream() .map(ServiceInstance::getPayload) .collect(Collectors.toList())); } @Override public void stateChanged (CuratorFramework client, ConnectionState newState) { log.info("The client {} connection status has changed. The current status is: {}." , client, newState); } }); serviceCache.start(); serviceCacheMap.put(serviceName, serviceCache); serviceMap.put(serviceName, serviceCacheMap.get(serviceName).getInstances() .stream() .map(ServiceInstance::getPayload) .collect(Collectors.toList())); } return serviceMap.get(serviceName); }
选择服务实例,根据服务名 + 负载均衡策略选择一个可用的 ServiceInfo
服务实例:
public ServiceInfo discover (RpcRequest request) { try { return loadBalance.select(getServices(request.getServiceName()), request); } catch (Exception e) { throw new RpcException (String.format("Remote service discovery did not find service %s." , request.getServiceName()), e); } }
清理所有zk相关连接资源,确保优雅下线:
public void destroy () throws Exception { for (ServiceCache<ServiceInfo> serviceCache : serviceCacheMap.values()) { if (serviceCache != null ) { serviceCache.close(); } } if (serviceDiscovery != null ) { serviceDiscovery.close(); } if (client != null ) { client.close(); } }
负载均衡算法 在分布式系统中,服务提供者和服务消费者都会有多台节点,如何保证服务提供者所有节点的负载均衡呢?
客户端在发起调用之前,需要感知有多少服务端节点可用,然后从中选取一个进行调用。客户端需要拿到服务端节点的状态信息,并根据不同的策略实现负载均衡算法
Round-Robin轮询:依次轮询服务端节点
Weighted Round-Robin 权重轮询:对不同负载水平的服务端节点增加权重系数,这样可以通过权重系数降低性能较差或者配置较低的节点流量
Least Connections 最少连接数:客户端根据服务端节点当前的连接数进行负载均衡,客户端会选择连接数最少的一台服务器进行调用。Least Connections策略只是服务端其中一种维度,我们可以演化出最少请求数、CPU利用率最低等其他维度的负载均衡方案
Consistent Hash 一致性 Hash:目前主流推荐的负载均衡策略,Consistent Hash 是一种特殊的 Hash 算法,在服务端节点扩容或者下线时,尽可能保证客户端请求还是固定分配到同一台服务器节点。Consistent Hash 算法是采用哈希环来实现的,通过 Hash 函数将对象和服务器节点放置在哈希环上,一般来说服务器可以选择 IP + Port 进行 Hash,然后为对象选择对应的服务器节点,在哈希环中顺时针查找距离对象 Hash 值最近的服务器节点
本项目实现Random、Round-Robin、Consistent Hash三种负载均衡算法
首先编写一个抽象类,定义实现负载均衡的方法:
public abstract class AbstractLoadBalance implements LoadBalance { @Override public ServiceInfo select (List<ServiceInfo> invokers, RpcRequest request) { if (invokers == null || invokers.isEmpty()) { return null ; } if (invokers.size() == 1 ) { return invokers.get(0 ); } return doSelect(invokers, request); } protected abstract ServiceInfo doSelect (List<ServiceInfo> invokers, RpcRequest request) ; }
随机负载均衡 随机选择一个节点即可
public class RandomLoadBalance extends AbstractLoadBalance { final Random random = new Random (); @Override protected ServiceInfo doSelect (List<ServiceInfo> invokers, RpcRequest request) { return invokers.get(random.nextInt(invokers.size())); } }
轮询负载均衡 public class RoundRobinLoadBalance extends AbstractLoadBalance { private static final AtomicInteger atomicInteger = new AtomicInteger (0 ); @Override public ServiceInfo doSelect (List<ServiceInfo> invokers, RpcRequest request) { return invokers.get(getAndIncrement() % invokers.size()); } public final int getAndIncrement () { int prev, next; do { prev = atomicInteger.get(); next = prev == Integer.MAX_VALUE ? 0 : prev + 1 ; } while (!atomicInteger.compareAndSet(prev, next)); return prev; } }
一致性哈希负载均衡 在分布式系统中,负载均衡是将请求分发到多个服务节点上的关键手段。如果使用简单的哈希方式(比如:hash(key) % 节点数量
),当服务节点发生变化(如新增或下线一个服务节点),所有请求的分发都会大范围变化,这会导致:
原本某个 key 缓存在 A 节点,新请求可能被分配到 B,导致缓存失效
用户粘性丧失,会话丢失
系统效率大幅下降
所以我们希望:节点数量变化时,尽量少地影响原有请求的分配规律
一致性哈希算法就是为此设计的,它有两个目标:
相同的请求key总是路由到同一个节点(请求一致性)
节点变动时,只有极少数 key 被重新分配(低扰动性)
一致性哈希的基本原理
将哈希空间想象成一个“环”
假设 hash 值范围是 0 ~ 2³²-1,我们可以把它画成一个圆环
所有节点(服务实例)都通过哈希函数映射到这个环上某个位置
将请求(根据其 key)也映射到环上
请求经过哈希函数也会得到一个 hash 值,在环上的某个点
如何选择目标节点?
从请求所在的 hash 点开始,顺时针查找,直到找到第一个节点
这个节点就是这个请求应该被分配到的服务节点
举个例子:
假设环上有 3 个节点:
A → 哈希值 1000
B → 哈希值 4000
C → 哈希值 7000
如果请求 key 哈希值为 4200,它将落在 C(7000)上
如果 key 哈希值为 8000,环上没有节点比它大,就从头开始找,第一个是 A(1000),所以分到 A
虚拟节点机制 真实服务节点数量较少(如3~5个),哈希分布不均匀,可能会出现某个节点承载大量请求的情况,导致负载不均
虚拟节点的做法:
每个真实节点被映射成多个虚拟节点(通常是160个)
每个虚拟节点 hash 后也放在环上
请求还是通过 hash 值查找最近的虚拟节点,然后找出它对应的真实节点
效果
请求被均匀地分布到多个虚拟节点上
从而间接实现真实节点的负载均衡
处理节点变更问题 新增节点:
会在环上增加一些新的虚拟节点
新虚拟节点会“截断”一小段原本属于其他节点的请求范围
只有这一小部分请求分配发生变化,其他绝大部分 key 对应的服务节点不变
删除节点:
其虚拟节点从环上移除
这些虚拟节点对应的请求会顺时针路由到下一个节点
同样只影响少部分请求
具体实现 编写静态内部类,实现一致性哈希的核心部分,维护一个虚拟节点环 来实现平衡性和低扰动性
private final static class ConsistentHashSelector { private final TreeMap<Long, ServiceInfo> virtualInvokers; private final int identityHashCode; public ConsistentHashSelector (List<ServiceInfo> invokers, int replicaNumber, int identityHashCode) { this .virtualInvokers = new TreeMap <>(); this .identityHashCode = identityHashCode; for (ServiceInfo invoker : invokers) { String address = invoker.getAddress(); for (int i = 0 ; i < replicaNumber / 4 ; i++) { byte [] digest = md5(address + i); for (int h = 0 ; h < 4 ; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } private byte [] md5(String key) { MessageDigest md; try { md = MessageDigest.getInstance("MD5" ); byte [] bytes = key.getBytes(StandardCharsets.UTF_8); md.update(bytes); } catch (NoSuchAlgorithmException e) { throw new RuntimeException (e); } return md.digest(); } private long hash (byte [] digest, int number) { return (((long ) (digest[3 + number * 4 ] & 0xFF ) << 24 ) | ((long ) (digest[2 + number * 4 ] & 0xFF ) << 16 ) | ((long ) (digest[1 + number * 4 ] & 0xFF ) << 8 ) | (digest[number * 4 ] & 0xFF )) & 0xFFFFFFFFL ; } public ServiceInfo select (String key) { byte [] digest = md5(key); return selectForKey(hash(digest, 0 )); } private ServiceInfo selectForKey (long hash) { Map.Entry<Long, ServiceInfo> entry = virtualInvokers.ceilingEntry(hash); if (entry == null ) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); } }
哈希方法步骤:
用MD5算法生成摘要:对IP地址+端口号执行MD5,得到长度16字节的摘要
从摘要中提取出多个32-bit的无符号整数(即虚拟节点哈希值)
每次从 16 字节的摘要中,取出连续的 4 字节
对这四个字节通过位运算拼接成一个32位的整数,代表一个hash值
执行4次,得到4个hash值
具体轮询方法实现:
public ServiceInfo doSelect (List<ServiceInfo> invokers, RpcRequest request) { String method = request.getMethod(); String key = request.getServiceName() + "." + method; int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector selector = selectors.get(key); if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector (invokers, 160 , identityHashCode)); selector = selectors.get(key); } String selectKey = key; if (request.getParameterValues() != null && request.getParameterValues().length > 0 ) { selectKey += Arrays.stream(request.getParameterValues()); } return selector.select(selectKey); }
根据方法名和参数生成哈希键,从缓存中获取一致性哈希选择器,根据选择器将相同参数请求始终路由到同一个服务节点,实现基于参数的一致性哈希负载均衡
网络通信模块 本项目实现了基于Netty、Http、Socket三种网络通信方式,关于这三种通信方式的区别,我会在另一个文章中说明
Netty通信 首先编写通信处理入口,使用Netty启动TCP服务器监听指定端口:
public class NettyRpcServer implements RpcServer { @SneakyThrows @Override public void start (Integer port) { EventLoopGroup boss = new NioEventLoopGroup (); EventLoopGroup worker = new NioEventLoopGroup (); try { InetAddress inetAddress = InetAddress.getLocalHost(); ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, true ) .childOption(ChannelOption.SO_KEEPALIVE, true ) .option(ChannelOption.SO_BACKLOG, 128 ) .handler(new LoggingHandler (LogLevel.DEBUG)) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler (30 , 0 , 0 , TimeUnit.SECONDS)); ch.pipeline().addLast(new RpcFrameDecoder ()); ch.pipeline().addLast(new SharableRpcMessageCodec ()); ch.pipeline().addLast(new NettyRpcRequestHandler ()); } }); ChannelFuture channelFuture = serverBootstrap.bind(inetAddress, port).sync(); log.debug("Rpc server add {} started on the port {}." , inetAddress, port); channelFuture.channel().closeFuture().sync(); } catch (UnknownHostException | InterruptedException e) { log.error("An error occurred while starting the rpc service." , e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
在handler中我加入了NettyRpcRequestHandler,作为具体的业务实现类:
public class NettyRpcRequestHandler extends SimpleChannelInboundHandler <RpcMessage> { private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor (10 , 20 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10000 )); private final RpcRequestHandler rpcRequestHandler; public NettyRpcRequestHandler () { this .rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class); } @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcMessage msg) throws Exception { threadPool.submit(() -> { try { RpcMessage responseRpcMessage = new RpcMessage (); MessageHeader header = msg.getHeader(); MessageType type = MessageType.parseByType(header.getMessageType()); log.debug("The message received by the server is: {}" , msg.getBody()); if (type == MessageType.HEARTBEAT_REQUEST) { header.setMessageType(MessageType.HEARTBEAT_RESPONSE.getType()); header.setMessageStatus(MessageStatus.SUCCESS.getCode()); responseRpcMessage.setHeader(header); responseRpcMessage.setBody(ProtocolConstants.PONG); } else { RpcRequest request = (RpcRequest) msg.getBody(); RpcResponse response = new RpcResponse (); header.setMessageType(MessageType.RESPONSE.getType()); try { Object result = rpcRequestHandler.handleRpcRequest(request); response.setReturnValue(result); header.setMessageStatus(MessageStatus.SUCCESS.getCode()); } catch (Exception e) { log.error("The service [{}], the method [{}] invoke failed!" , request.getServiceName(), request.getMethod()); response.setExceptionValue(new RpcException ("Error in remote procedure call, " + e.getMessage())); header.setMessageStatus(MessageStatus.FAIL.getCode()); } responseRpcMessage.setHeader(header); responseRpcMessage.setBody(response); } log.debug("responseRpcMessage: {}." , responseRpcMessage); ctx.writeAndFlush(responseRpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } finally { ReferenceCountUtil.release(msg); } }); } @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.READER_IDLE) { log.warn("idle check happen, so close the connection." ); ctx.close(); } } else { super .userEventTriggered(ctx, evt); } } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) { log.error("server catch exception" ); cause.printStackTrace(); ctx.close(); } }
Http通信 编写http通信实现类,通过内置Tomcat实现:
public class HttpRpcServer implements RpcServer { @Override public void start (Integer port) { try { Tomcat tomcat = new Tomcat (); Server server = tomcat.getServer(); Service service = server.findService("Tomcat" ); Connector connector = new Connector (); connector.setPort(port); String hostname = InetAddress.getLocalHost().getHostAddress(); StandardEngine engine = new StandardEngine (); engine.setDefaultHost(hostname); StandardHost host = new StandardHost (); host.setName(hostname); String contextPath = "" ; Context context = new StandardContext (); context.setPath(contextPath); context.addLifecycleListener(new Tomcat .FixContextListener()); host.addChild(context); engine.addChild(host); service.setContainer(engine); service.addConnector(connector); tomcat.addServlet(contextPath, "dispatcher" , new DispatcherServlet ()); context.addServletMappingDecoded("/*" , "dispatcher" ); tomcat.start(); tomcat.getServer().await(); } catch (LifecycleException | UnknownHostException e) { throw new RpcException ("Tomcat server failed to start." , e); } } }
Tomcat接收到http请求后,会交给DispatcherServlet类进行统一处理:
public class DispatcherServlet extends HttpServlet { private final int cpuNum = Runtime.getRuntime().availableProcessors(); private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor (cpuNum * 2 , cpuNum * 2 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10000 )); @Override protected void service (HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { HttpRpcRequestHandler handler = SingletonFactory.getInstance(HttpRpcRequestHandler.class); threadPool.submit(() -> handler.handle(req, resp)); } }
然后由DispatcherServlet将请求发给HttpRpcRequestHandler进行实际业务处理:
public class HttpRpcRequestHandler { private final RpcRequestHandler rpcRequestHandler; public HttpRpcRequestHandler () { this .rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class); } public void handle (HttpServletRequest req, HttpServletResponse resp) { try { ObjectInputStream ois = new ObjectInputStream (req.getInputStream()); ObjectOutputStream oos = new ObjectOutputStream (resp.getOutputStream()); RpcRequest request = (RpcRequest) ois.readObject(); log.debug("The server received message is {}." , request); RpcResponse response = new RpcResponse (); try { Object result = rpcRequestHandler.handleRpcRequest(request); response.setReturnValue(result); } catch (Exception e) { log.error("The service [{}], the method [{}] invoke failed!" , request.getServiceName(), request.getMethod()); response.setExceptionValue(new RpcException ("Error in remote procedure call, " + e.getMessage())); } log.debug("The response is {}." , response); oos.writeObject(response); } catch (IOException | ClassNotFoundException e) { throw new RuntimeException ("The http server failed to handle client rpc request." , e); } } }
Socket通信 socket通信是最原始的通信方式,首先编写server类:
public class SocketRpcServer implements RpcServer { private final int cpuNum = Runtime.getRuntime().availableProcessors(); private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor (cpuNum * 2 , cpuNum * 2 , 60L , TimeUnit.SECONDS, new ArrayBlockingQueue <>(10000 )); @Override public void start (Integer port) { try (ServerSocket serverSocket = new ServerSocket ()) { String hostAddress = InetAddress.getLocalHost().getHostAddress(); serverSocket.bind(new InetSocketAddress (hostAddress, port)); Socket socket; while ((socket = serverSocket.accept()) != null ) { log.debug("The client connected [{}]." , socket.getInetAddress()); threadPool.execute(new SocketRpcRequestHandler (socket)); } threadPool.shutdown(); } catch (IOException e) { throw new RpcException (String.format("The socket server failed to start on port %d." , port), e); } } }
然后是具体的请求处理逻辑,实现Runnable
接口,配合线程池执行每一个客户端连接的请求
public class SocketRpcRequestHandler implements Runnable { private final Socket socket; private final RpcRequestHandler rpcRequestHandler; public SocketRpcRequestHandler (Socket socket) { this .socket = socket; this .rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class); } @Override public void run () { log.debug("The server handle client message by thread {}." , Thread.currentThread().getName()); try (ObjectInputStream ois = new ObjectInputStream (socket.getInputStream())) { ObjectOutputStream oos = new ObjectOutputStream (socket.getOutputStream()); RpcRequest request = (RpcRequest) ois.readObject(); log.debug("The server received message is {}." , request); RpcResponse response = new RpcResponse (); try { Object result = rpcRequestHandler.handleRpcRequest(request); response.setReturnValue(result); } catch (Exception e) { log.error("The service [{}], the method [{}] invoke failed!" , request.getServiceName(), request.getMethod()); response.setExceptionValue(new RpcException ("Error in remote procedure call, " + e.getMessage())); } log.debug("The response is {}." , response); oos.writeObject(response); } catch (IOException | ClassNotFoundException e) { throw new RuntimeException ("The socket server failed to handle client rpc request." , e); } } }
动态代理 RPC框架怎么做到像调用本地接口一样调用远端服务呢?这必须依赖动态代理来实现
需要创建一个代理对象,在代理对象中完成数据报文编码,然后发起调用发送数据给服务提供方,以此屏蔽RPC框架的调用细节。因为代理类是在运行时生成的,所以代理类的生成速度、生成的字节码大小都会影响RPC框架整体的性能和资源消耗
动态代理比较主流的实现方案有以下几种:JDK、Cglib、Javassist、ASM、ByteBuddy
JDK:在运行时可以动态创建代理类,但是 JDK 动态代理的功能比较局限,代理对象必须实现一个接口,否则抛出异常。因为代理类会继承Proxy类,然而 Java 是不支持多重继承的,只能通过接口实现多态。JDK 动态代理所生成的代理类是接口的实现类,不能代理接口中不存在的方法。JDK 动态代理是通过反射调用的形式代理类中的方法,比直接调用肯定是性能要慢的
Cglib:Cglib是基于ASM字节码生成框架实现的,通过字节码技术生成的代理类,所以代理类的类型是不受限制的。而且Cglib生成的代理类是继承于被代理类,所以可以提供更加灵活的功能。在代理方法方面,Cglib 是有优势的,它采用了 FastClass 机制,为代理类和被代理类各自创建一个 Class,这个 Class会为代理类和被代理类的方法分配 index 索引,FastClass 就可以通过 index 直接定位要调用的方法,并直接调用,这是一种空间换时间的优化思路
Javassist和ASM。二者都是Java字节码操作框架,使用起来难度较大,需要开发者对 Class 文件结构以及 JVM 都有所了解,但是它们都比反射的性能要高
Byte Buddy 也是一个字节码生成和操作的类库,Byte Buddy 功能强大,相比于 Javassist 和 ASM,Byte Buddy 提供了更加便捷的 API,用于创建和修改 Java 类,无须理解字节码的格式,而且 Byte Buddy 更加轻量,性能更好
本项目实现了JDK和CGLIB动态代理
具体的代理逻辑:
public class ClientStubProxyFactory { private final ServiceDiscovery discovery; private final RpcClient rpcClient; private final RpcClientProperties properties; public ClientStubProxyFactory (ServiceDiscovery discovery, RpcClient rpcClient, RpcClientProperties properties) { this .discovery = discovery; this .rpcClient = rpcClient; this .properties = properties; } private static final Map<String, Object> proxyMap = new ConcurrentHashMap <>(); public <T> T getProxy (Class<T> clazz, String version) { return (T) proxyMap.computeIfAbsent(ServiceUtil.serviceKey(clazz.getName(), version), serviceName -> { if (clazz.isInterface() || Proxy.isProxyClass(clazz)) { return Proxy.newProxyInstance(clazz.getClassLoader(), new Class []{clazz}, new ClientStubInvocationHandler (discovery, rpcClient, properties, serviceName)); } else { Enhancer enhancer = new Enhancer (); enhancer.setClassLoader(clazz.getClassLoader()); enhancer.setSuperclass(clazz); enhancer.setCallback(new ClientStubMethodInterceptor (discovery, rpcClient, properties, serviceName)); return enhancer.create(); } }); } }
Netty心跳机制与Channel复用 为了解决每次请求客户端都要重新与服务端建立netty连接,非常耗时,增加心跳检查机制,保持长连接,复用channel连接;
长连接:避免了每次调用新建TCP连接,提高了调用的响应速度
Channel 连接复用:避免重复连接服务端
多路复用:单个TCP连接可交替传输多个请求和响应的消息,降低了连接的等待闲置时间,从而减少了同样并发数下的网络连接数,提高了系统吞吐量
多路复用实现 使用一个ConcurrentHashMap
存放每个请求的等待结果区,当响应回来时,用sequenceId找到对应的Promise完成结果
public static final Map<Integer, Promise<RpcMessage>> UNPROCESSED_RPC_RESPONSES = new ConcurrentHashMap <>();
拿到已经返回结果的Promise:
protected void channelRead0 (ChannelHandlerContext ctx, RpcMessage msg) throws Exception { try { MessageType type = MessageType.parseByType(msg.getHeader().getMessageType()); if (type == MessageType.RESPONSE) { int sequenceId = msg.getHeader().getSequenceId(); Promise<RpcMessage> promise = UNPROCESSED_RPC_RESPONSES.remove(sequenceId); if (promise != null ) { Exception exception = ((RpcResponse) msg.getBody()).getExceptionValue(); if (exception == null ) { promise.setSuccess(msg); } else { promise.setFailure(exception); } } } else if (type == MessageType.HEARTBEAT_RESPONSE) { log.debug("Heartbeat info {}." , msg.getBody()); } } finally { ReferenceCountUtil.release(msg); } }
发出请求后,存放还未返回结果的Promise:
public RpcMessage sendRpcRequest (RequestMetadata requestMetadata) { Promise<RpcMessage> promise; Channel channel = getChannel(new InetSocketAddress (requestMetadata.getServerAddr(), requestMetadata.getPort())); if (channel.isActive()) { promise = new DefaultPromise <>(channel.eventLoop()); int sequenceId = requestMetadata.getRpcMessage().getHeader().getSequenceId(); RpcResponseHandler.UNPROCESSED_RPC_RESPONSES.put(sequenceId, promise); channel.writeAndFlush(requestMetadata.getRpcMessage()).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { log.debug("The client send the message successfully, msg: [{}]." , requestMetadata); } else { future.channel().close(); promise.setFailure(future.cause()); log.error("The client send the message failed." , future.cause()); } }); Integer timeout = requestMetadata.getTimeout(); if (timeout == null || timeout <= 0 ) { promise.await(); } else { boolean success = promise.await(requestMetadata.getTimeout(), TimeUnit.MILLISECONDS); if (!success) { promise.setFailure(new TimeoutException (String.format("The Remote procedure call exceeded the " + "specified timeout of %dms." , timeout))); } } if (promise.isSuccess()) { return promise.getNow(); } else { throw new RpcException (promise.cause()); } } else { throw new IllegalStateException ("The channel is inactivate." ); } }
长连接实现 在client的响应消息处理器中添加自定义时间处理器,当检测到写空闲发生时自动发送一个心跳包:
@Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { if (((IdleStateEvent) evt).state() == IdleState.WRITER_IDLE) { log.warn("Write idle happen [{}]." , ctx.channel().remoteAddress()); RpcMessage rpcMessage = new RpcMessage (); MessageHeader header = MessageHeader.build(SerializationType.KRYO.name()); header.setMessageType(MessageType.HEARTBEAT_REQUEST.getType()); rpcMessage.setHeader(header); rpcMessage.setBody(ProtocolConstants.PING); ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } else { super .userEventTriggered(ctx, evt); } }
Channel连接复用 使用ConcurrentHashMap存放每个IP+端口号
使用过的Channel,如果当前IP的端口再次发起rpc请求后,检测到之前的Channel还没关闭,就可以再次使用,避免重复连接服务端
public class ChannelProvider { private final Map<String, Channel> channels = new ConcurrentHashMap <>(); public Channel get (String hostname, Integer port) { String key = hostname + ":" + port; if (channels.containsKey(key)) { Channel channel = channels.get(key); if (channel != null && channel.isActive()) { return channel; } else { channels.remove(key); } } return null ; } public Channel get (InetSocketAddress inetSocketAddress) { return get(inetSocketAddress.getHostName(), inetSocketAddress.getPort()); } public void set (String hostname, Integer port, Channel channel) { String key = hostname + ":" + port; channels.put(key, channel); } public void set (InetSocketAddress inetSocketAddress, Channel channel) { this .set(inetSocketAddress.getHostName(), inetSocketAddress.getPort(), channel); } }
集成Spring自定义注解 首先编写RpcService注解类,提供服务接口、接口名和版本等元数据,供框架进行服务发现、注册和远程调用时使用
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface RpcService { Class<?> interfaceClass() default void .class; String interfaceName () default "" ; String version () default "1.0" ; }
然后编写Spring扩展机制中的一个实现类,扫描被@RpcService标注的组件并将对应的BeanDefiniton对象注册到Spring
public class RpcBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar , ResourceLoaderAware { private ResourceLoader resourceLoader; @Override public void setResourceLoader (ResourceLoader resourceLoader) { this .resourceLoader = resourceLoader; } @Override public void registerBeanDefinitions (AnnotationMetadata annotationMetadata, BeanDefinitionRegistry registry) { AnnotationAttributes annotationAttributes = AnnotationAttributes .fromMap(annotationMetadata.getAnnotationAttributes(RpcComponentScan.class.getName())); String[] basePackages = {}; if (annotationAttributes != null ) { basePackages = annotationAttributes.getStringArray("basePackages" ); } if (basePackages.length == 0 ) { basePackages = new String []{((StandardAnnotationMetadata) annotationMetadata).getIntrospectedClass().getPackage().getName()}; } RpcClassPathBeanDefinitionScanner rpcServiceScanner = new RpcClassPathBeanDefinitionScanner (registry, RpcService.class); if (this .resourceLoader != null ) { rpcServiceScanner.setResourceLoader(this .resourceLoader); } int count = rpcServiceScanner.scan(basePackages); log.info("The number of BeanDefinition scanned and registered by RpcServiceScanner is {}." , count); } }
编写RpcComponentScan注解类,调用上述具体实现类:
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(RpcBeanDefinitionRegistrar.class) public @interface RpcComponentScan { @AliasFor("basePackages") String[] value() default {}; @AliasFor("value") String[] basePackages() default {}; }
现在经过调用链,服务已经被注册到Spring中了,接下来利用Spring框架自动将服务信息注册到注册中心:
public class RpcServerBeanPostProcessor implements BeanPostProcessor , CommandLineRunner { private final ServiceRegistry serviceRegistry; private final RpcServer rpcServer; private final RpcServerProperties properties; public RpcServerBeanPostProcessor (ServiceRegistry serviceRegistry, RpcServer rpcServer, RpcServerProperties properties) { this .serviceRegistry = serviceRegistry; this .rpcServer = rpcServer; this .properties = properties; } @SneakyThrows @Override public Object postProcessAfterInitialization (Object bean, String beanName) throws BeansException { if (bean.getClass().isAnnotationPresent(RpcService.class)) { log.info("[{}] is annotated with [{}]." , bean.getClass().getName(), RpcService.class.getCanonicalName()); RpcService rpcService = bean.getClass().getAnnotation(RpcService.class); String interfaceName; if ("" .equals(rpcService.interfaceName())) { interfaceName = rpcService.interfaceClass().getName(); } else { interfaceName = rpcService.interfaceName(); } String version = rpcService.version(); String serviceName = ServiceUtil.serviceKey(interfaceName, version); ServiceInfo serviceInfo = ServiceInfo.builder() .appName(properties.getAppName()) .serviceName(serviceName) .version(version) .address(properties.getAddress()) .port(properties.getPort()) .build(); serviceRegistry.register(serviceInfo); LocalServiceCache.addService(serviceName, bean); } return bean; } @Override public void run (String... args) throws Exception { new Thread (() -> rpcServer.start(properties.getPort())).start(); log.info("Rpc server [{}] start, the appName is {}, the port is {}" , rpcServer, properties.getAppName(), properties.getPort()); Runtime.getRuntime().addShutdownHook(new Thread (() -> { try { serviceRegistry.destroy(); } catch (Exception e) { throw new RuntimeException (e); } })); } }
上述两个注解是服务提供方的,下面实现服务调用方的注解RpcReference
,用于标记需要动态代理生成远程服务调用客户端的字段或方法
@Target({ElementType.FIELD, ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface RpcReference { Class<?> interfaceClass() default void .class; String interfaceName () default "" ; String version () default "1.0" ; String loadbalance () default "" ; String mock () default "" ; int timeout () default 0 ; }
然后编写打上注解标签后的实现细节,在Spring完成Bean实例化之后,扫描Bean中标注了@RpcReference
注解的字段,并将这些字段替换为对应的客户端代理对象:
public class RpcClientBeanPostProcessor implements BeanPostProcessor { private final ClientStubProxyFactory proxyFactory; public RpcClientBeanPostProcessor (ClientStubProxyFactory proxyFactory) { this .proxyFactory = proxyFactory; } @Override public Object postProcessAfterInitialization (Object bean, String beanName) throws BeansException { Field[] fields = bean.getClass().getDeclaredFields(); for (Field field : fields) { if (field.isAnnotationPresent(RpcReference.class)) { RpcReference rpcReference = field.getAnnotation(RpcReference.class); Class<?> clazz = field.getType(); try { if (!"" .equals(rpcReference.interfaceName())) { clazz = Class.forName(rpcReference.interfaceName()); } if (rpcReference.interfaceClass() != void .class) { clazz = rpcReference.interfaceClass(); } Object proxy = proxyFactory.getProxy(clazz, rpcReference.version()); field.setAccessible(true ); field.set(bean, proxy); } catch (ClassNotFoundException | IllegalAccessException e) { throw new RpcException (String.format("Failed to obtain proxy object, the type of field %s is %s, " + "and the specified loaded proxy type is %s." , field.getName(), field.getClass(), clazz), e); } } } return bean; } }
集成SpringBoot实现自动装配 编写对应的自动配置的配置类以及 spring.factories
文件,引入对应的starter
即可完成自动配置功能。
server端的配置类,主要作用有
将服务注册到注册中心
启动RPC服务,监听请求连接
开启Bean后处理器
@Configuration @EnableConfigurationProperties(RpcServerProperties.class) public class RpcServerAutoConfiguration { @Autowired RpcServerProperties properties; @Bean(name = "serviceRegistry") @Primary @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "rpc.server", name = "registry", havingValue = "zookeeper", matchIfMissing = true) public ServiceRegistry zookeeperServiceRegistry () { return new ZookeeperServiceRegistry (properties.getRegistryAddr()); } @Bean(name = "serviceRegistry") @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "rpc.server", name = "registry", havingValue = "nacos") public ServiceRegistry nacosServiceRegistry () { return new NacosServiceRegistry (properties.getRegistryAddr()); } @Bean(name = "rpcServer") @Primary @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "rpc.server", name = "transport", havingValue = "netty", matchIfMissing = true) public RpcServer nettyRpcServer () { return new NettyRpcServer (); } @Bean(name = "rpcServer") @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "rpc.server", name = "transport", havingValue = "http") @ConditionalOnClass(name = {"org.apache.catalina.startup.Tomcat"}) public RpcServer httpRpcServer () { return new HttpRpcServer (); } @Bean(name = "rpcServer") @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "rpc.server", name = "transport", havingValue = "socket") public RpcServer socketRpcServer () { return new SocketRpcServer (); } @Bean @ConditionalOnMissingBean @ConditionalOnBean({ServiceRegistry.class, RpcServer.class}) public RpcServerBeanPostProcessor rpcServerBeanPostProcessor (@Autowired ServiceRegistry serviceRegistry, @Autowired RpcServer rpcServer, @Autowired RpcServerProperties properties) { return new RpcServerBeanPostProcessor (serviceRegistry, rpcServer, properties); } }
client的自动配置类,主要作用有:
配置负载均衡算法
服务发现
客户端网络通信,用来发送和接收响应
动态代理类
开启Bean后处理器
退出时清理组件
@Configuration @EnableConfigurationProperties(RpcClientProperties.class) public class RpcClientAutoConfiguration { @Deprecated public RpcClientProperties rpcClientProperties (Environment environment) { BindResult<RpcClientProperties> bind = Binder.get(environment).bind("rpc.client" , RpcClientProperties.class); return bind.get(); } @Autowired RpcClientProperties rpcClientProperties; @Bean(name = "loadBalance") @Primary @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "rpc.client", name = "loadbalance", havingValue = "random", matchIfMissing = true) public LoadBalance randomLoadBalance () { return new RandomLoadBalance (); } @Bean(name = "loadBalance") @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "rpc.client", name = "loadbalance", havingValue = "roundRobin") public LoadBalance roundRobinLoadBalance () { return new RoundRobinLoadBalance (); } @Bean(name = "loadBalance") @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "rpc.client", name = "loadbalance", havingValue = "consistentHash") public LoadBalance consistentHashLoadBalance () { return new ConsistentHashLoadBalance (); } @Bean(name = "serviceDiscovery") @Primary @ConditionalOnMissingBean @ConditionalOnBean(LoadBalance.class) @ConditionalOnProperty(prefix = "rpc.client", name = "registry", havingValue = "zookeeper", matchIfMissing = true) public ServiceDiscovery zookeeperServiceDiscovery (@Autowired LoadBalance loadBalance) { return new ZookeeperServiceDiscovery (rpcClientProperties.getRegistryAddr(), loadBalance); } @Bean(name = "serviceDiscovery") @ConditionalOnMissingBean @ConditionalOnBean(LoadBalance.class) @ConditionalOnProperty(prefix = "rpc.client", name = "registry", havingValue = "nacos") public ServiceDiscovery nacosServiceDiscovery (@Autowired LoadBalance loadBalance) { return new NacosServiceDiscovery (rpcClientProperties.getRegistryAddr(), loadBalance); } @Bean(name = "rpcClient") @Primary @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "rpc.client", name = "transport", havingValue = "netty", matchIfMissing = true) public RpcClient nettyRpcClient () { return new NettyRpcClient (); } @Bean(name = "rpcClient") @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "rpc.client", name = "transport", havingValue = "http") public RpcClient httpRpcClient () { return new HttpRpcClient (); } @Bean(name = "rpcClient") @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "rpc.client", name = "transport", havingValue = "socket") public RpcClient socketRpcClient () { return new SocketRpcClient (); } @Bean @ConditionalOnMissingBean @ConditionalOnBean({ServiceDiscovery.class, RpcClient.class}) public ClientStubProxyFactory clientStubProxyFactory (@Autowired ServiceDiscovery serviceDiscovery, @Autowired RpcClient rpcClient, @Autowired RpcClientProperties rpcClientProperties) { return new ClientStubProxyFactory (serviceDiscovery, rpcClient, rpcClientProperties); } @Bean @ConditionalOnMissingBean public RpcClientBeanPostProcessor rpcClientBeanPostProcessor (@Autowired ClientStubProxyFactory clientStubProxyFactory) { return new RpcClientBeanPostProcessor (clientStubProxyFactory); } @Bean @ConditionalOnMissingBean public RpcClientExitDisposableBean rpcClientExitDisposableBean (@Autowired ServiceDiscovery serviceDiscovery) { return new RpcClientExitDisposableBean (serviceDiscovery); } }