扩展序列化算法

序列化,反序列化主要用在消息正文的转换上

  • 序列化时,需要将Java对象变为要传输的数据(可以是byte[],json等,最终都需要变成byte[])
  • 反序列化时,需要将传入的正文数据还原成Java对象,便于处理

Java自带的序列化,反序列化机制,核心代码如下:

// 反序列化
byte[] body = new byte[bodyLength];
byteByf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);

// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();

为了支持更多序列化算法,抽象一个Serializer接口,使用枚举类实现具体的算法

public enum SerializerAlgorithm implements Serializer {
Jdk(0), Json(1), Protostuff(2), Kryo(3), Hessian(4);

private final int type;

SerializerAlgorithm(int type) {
this.type = type;
}

public int getType() {
return type;
}

// Jdk
Jdk {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
ObjectInputStream in =
new ObjectInputStream(new ByteArrayInputStream(bytes));
Object object = in.readObject();
return (T) object;
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误", e);
}
}

@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(object);
return out.toByteArray();
} catch (IOException e) {
throw new RuntimeException("SerializerAlgorithm.Java 序列化错误", e);
}
}
},
// Json 实现(引入了 Gson 依赖)
Json {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);
}

@Override
public <T> byte[] serialize(T object) {
return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);
}
};

// 需要从协议的字节中得到是哪种序列化算法
public static SerializerAlgorithm getByInt(int type) {
SerializerAlgorithm[] array = SerializerAlgorithm.values();
if (type < 0 || type > array.length - 1) {
throw new IllegalArgumentException("超过 SerializerAlgorithm 范围");
}
return array[type];
},
// Protostuff 实现
Protostuff {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
// 获取类对应的 schema(序列化元数据)
Schema<T> schema = RuntimeSchema.getSchema(clazz);
T obj = schema.newMessage(); // 创建对象实例
// 将字节数据合并反序列化到对象中
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}

@Override
public <T> byte[] serialize(T object) {
// 获取运行时 schema
Schema<T> schema = RuntimeSchema.getSchema((Class<T>) object.getClass());
// 创建缓冲区
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
return ProtostuffIOUtil.toByteArray(object, schema, buffer);
} finally {
buffer.clear();
}
}
},
// Kryo 实现
Kryo {
// 使用 ThreadLocal 确保 Kryo 在多线程中安全使用
private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(false); // 允许动态类型注册
return kryo;
});

@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
Kryo kryo = kryoThreadLocal.get();
Input input = new Input(new ByteArrayInputStream(bytes));
// 使用 Kryo 反序列化对象
return kryo.readObject(input, clazz);
}

@Override
public <T> byte[] serialize(T object) {
Kryo kryo = kryoThreadLocal.get();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Output output = new Output(out);
// 使用 Kryo 序列化对象
kryo.writeObject(output, object);
output.close();
return out.toByteArray();
}
},
// Hessian 实现
Hessian {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
// 创建 Hessian 输入流
HessianInput input = new HessianInput(new ByteArrayInputStream(bytes));
return (T) input.readObject(); // 反序列化对象
} catch (IOException e) {
throw new RuntimeException("Hessian 反序列化失败", e);
}
}

@Override
public <T> byte[] serialize(T object) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
// 创建 Hessian 输出流并序列化对象
HessianOutput output = new HessianOutput(out);
output.writeObject(object);
return out.toByteArray();
} catch (IOException e) {
throw new RuntimeException("Hessian 序列化失败", e);
}
}
};

// 序列化类型编号(建议用于协议中)
public static SerializerAlgorithm getByInt(int type) {
SerializerAlgorithm[] array = SerializerAlgorithm.values();
if (type < 0 || type > array.length - 1) {
throw new IllegalArgumentException("超过 SerializerAlgorithm 范围");
}
return array[type];
}
}

增加配置类和配置文件

public abstract class Config {
static Properties properties;
static {
try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}
public static int getServerPort() {
String value = properties.getProperty("server.port");
if(value == null) {
return 8080;
} else {
return Integer.parseInt(value);
}
}
public static Serializer.Algorithm getSerializerAlgorithm() {
String value = properties.getProperty("serializer.algorithm");
if(value == null) {
return Serializer.Algorithm.Java;
} else {
return Serializer.Algorithm.valueOf(value);
}
}
}

修改编解码器

/**
* 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式
out.writeByte(Config.getSerializerAlgorithm().ordinal());
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
outList.add(out);
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerAlgorithm = in.readByte();
byte messageType = in.readByte(); // 0,1,2...
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);

// 找到反序列化算法
Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];
// 确定具体消息类型
Class<? extends Message> messageClass = Message.getMessageClass(messageType);
Message message = algorithm.deserialize(messageClass, bytes);
// log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
// log.debug("{}", message);
out.add(message);
}
}

参数调优

CONNECT_TIMEOUT_MILLIS

  • 属于SocketChannal参数

  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常

  • 与SO_TIMEOUT的区别:SO_TIMEOUT主要用在阻塞IO,阻塞IO中accept,read等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间

@Slf4j
public class TestConnectionTimeout {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler());
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
future.sync().channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
log.debug("timeout");
} finally {
group.shutdownGracefully();
}
}
}

SO_BACKLOG

  • 属于 ServerSocketChannal 参数
  • 用于设置服务端监听套接字(ServerSocket)可接受的连接请求队列的长度

回忆一下TCP的三次握手:

sequenceDiagram

participant c as client
participant s as server
participant sq as syns queue
participant aq as accept queue

s ->> s : bind()
s ->> s : listen()
c ->> c : connect()
c ->> s : 1. SYN
Note left of c : SYN_SEND
s ->> sq : put
Note right of s : SYN_RCVD
s ->> c : 2. SYN + ACK
Note left of c : ESTABLISHED
c ->> s : 3. ACK
sq ->> aq : put
Note right of s : ESTABLISHED
aq -->> s :
s ->> s : accept()
  1. 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
  2. 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
  3. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue
  • 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制

  • sync queue - 半连接队列

    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  • accept queue - 全连接队列

    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
    • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client

netty中可以通过 option(ChannelOption.SO_BACKLOG, 值)来设置大小

ulimit -n

  • 属于操作系统参数
  • 用于设置当前用户能打开的最大文件描述符数量

TCP_NODELAY

  • 属于 SocketChannal 参数
  • 禁用 Nagle 算法,牺牲带宽效率,换来低延迟
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);

SO_SNDBUF & SO_RCVBUF

  • SO_SNDBUF 属于 SocketChannal 参数,socket发送缓冲区大小
  • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上),socket接受缓冲区大小
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.SO_SNDBUF, 1024 * 64); // 64KB 发送缓冲区
bootstrap.childOption(ChannelOption.SO_RCVBUF, 1024 * 64); // 64KB 接收缓冲区

ALLOCATOR

  • 属于 SocketChannal 参数
  • 用来内存分配和缓冲区管理,分配 ByteBuf,ctx.alloc()
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); // 默认

RCVBUF_ALLOCATOR

默认情况下,Netty 会循环调用 read() 将数据从 socket 读取到内存,但:

  • 读太少 → 系统调用频繁,吞吐低;
  • 读太多 → 无效分配,浪费内存,还可能引发 OOM。

为此,Netty 引入了 RecvByteBufAllocator 接口,允许动态调整每次读取的字节数,提高效率。

  • 属于 SocketChannal 参数
  • 控制 netty 接收缓冲区大小
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定
ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(64, 1024, 65536)
);