扩展序列化算法 序列化,反序列化主要用在消息正文的转换上
序列化时,需要将Java对象变为要传输的数据(可以是byte[],json等,最终都需要变成byte[])
反序列化时,需要将传入的正文数据还原成Java对象,便于处理
Java自带的序列化,反序列化机制,核心代码如下:
byte [] body = new byte [bodyLength];byteByf.readBytes(body); ObjectInputStream in = new ObjectInputStream (new ByteArrayInputStream (body));Message message = (Message) in.readObject();message.setSequenceId(sequenceId); ByteArrayOutputStream out = new ByteArrayOutputStream ();new ObjectOutputStream (out).writeObject(message);byte [] bytes = out.toByteArray();
为了支持更多序列化算法,抽象一个Serializer接口,使用枚举类实现具体的算法
public enum SerializerAlgorithm implements Serializer { Jdk(0 ), Json(1 ), Protostuff(2 ), Kryo(3 ), Hessian(4 ); private final int type; SerializerAlgorithm(int type) { this .type = type; } public int getType () { return type; } Jdk { @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { try { ObjectInputStream in = new ObjectInputStream (new ByteArrayInputStream (bytes)); Object object = in.readObject(); return (T) object; } catch (IOException | ClassNotFoundException e) { throw new RuntimeException ("SerializerAlgorithm.Java 反序列化错误" , e); } } @Override public <T> byte [] serialize(T object) { try { ByteArrayOutputStream out = new ByteArrayOutputStream (); new ObjectOutputStream (out).writeObject(object); return out.toByteArray(); } catch (IOException e) { throw new RuntimeException ("SerializerAlgorithm.Java 序列化错误" , e); } } }, Json { @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { return new Gson ().fromJson(new String (bytes, StandardCharsets.UTF_8), clazz); } @Override public <T> byte [] serialize(T object) { return new Gson ().toJson(object).getBytes(StandardCharsets.UTF_8); } }; public static SerializerAlgorithm getByInt (int type) { SerializerAlgorithm[] array = SerializerAlgorithm.values(); if (type < 0 || type > array.length - 1 ) { throw new IllegalArgumentException ("超过 SerializerAlgorithm 范围" ); } return array[type]; }, Protostuff { @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { Schema<T> schema = RuntimeSchema.getSchema(clazz); T obj = schema.newMessage(); ProtostuffIOUtil.mergeFrom(bytes, obj, schema); return obj; } @Override public <T> byte [] serialize(T object) { Schema<T> schema = RuntimeSchema.getSchema((Class<T>) object.getClass()); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { return ProtostuffIOUtil.toByteArray(object, schema, buffer); } finally { buffer.clear(); } } }, Kryo { private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> { Kryo kryo = new Kryo (); kryo.setRegistrationRequired(false ); return kryo; }); @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { Kryo kryo = kryoThreadLocal.get(); Input input = new Input (new ByteArrayInputStream (bytes)); return kryo.readObject(input, clazz); } @Override public <T> byte [] serialize(T object) { Kryo kryo = kryoThreadLocal.get(); ByteArrayOutputStream out = new ByteArrayOutputStream (); Output output = new Output (out); kryo.writeObject(output, object); output.close(); return out.toByteArray(); } }, Hessian { @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { try { HessianInput input = new HessianInput (new ByteArrayInputStream (bytes)); return (T) input.readObject(); } catch (IOException e) { throw new RuntimeException ("Hessian 反序列化失败" , e); } } @Override public <T> byte [] serialize(T object) { try (ByteArrayOutputStream out = new ByteArrayOutputStream ()) { HessianOutput output = new HessianOutput (out); output.writeObject(object); return out.toByteArray(); } catch (IOException e) { throw new RuntimeException ("Hessian 序列化失败" , e); } } }; public static SerializerAlgorithm getByInt (int type) { SerializerAlgorithm[] array = SerializerAlgorithm.values(); if (type < 0 || type > array.length - 1 ) { throw new IllegalArgumentException ("超过 SerializerAlgorithm 范围" ); } return array[type]; } }
增加配置类和配置文件
public abstract class Config { static Properties properties; static { try (InputStream in = Config.class.getResourceAsStream("/application.properties" )) { properties = new Properties (); properties.load(in); } catch (IOException e) { throw new ExceptionInInitializerError (e); } } public static int getServerPort () { String value = properties.getProperty("server.port" ); if (value == null ) { return 8080 ; } else { return Integer.parseInt(value); } } public static Serializer.Algorithm getSerializerAlgorithm () { String value = properties.getProperty("serializer.algorithm" ); if (value == null ) { return Serializer.Algorithm.Java; } else { return Serializer.Algorithm.valueOf(value); } } }
修改编解码器
public class MessageCodecSharable extends MessageToMessageCodec <ByteBuf, Message> { @Override public void encode (ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception { ByteBuf out = ctx.alloc().buffer(); out.writeBytes(new byte []{1 , 2 , 3 , 4 }); out.writeByte(1 ); out.writeByte(Config.getSerializerAlgorithm().ordinal()); out.writeByte(msg.getMessageType()); out.writeInt(msg.getSequenceId()); out.writeByte(0xff ); byte [] bytes = Config.getSerializerAlgorithm().serialize(msg); out.writeInt(bytes.length); out.writeBytes(bytes); outList.add(out); } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int magicNum = in.readInt(); byte version = in.readByte(); byte serializerAlgorithm = in.readByte(); byte messageType = in.readByte(); int sequenceId = in.readInt(); in.readByte(); int length = in.readInt(); byte [] bytes = new byte [length]; in.readBytes(bytes, 0 , length); Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm]; Class<? extends Message > messageClass = Message.getMessageClass(messageType); Message message = algorithm.deserialize(messageClass, bytes); out.add(message); } }
参数调优 CONNECT_TIMEOUT_MILLIS
@Slf4j public class TestConnectionTimeout { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap () .group(group) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300 ) .channel(NioSocketChannel.class) .handler(new LoggingHandler ()); ChannelFuture future = bootstrap.connect("127.0.0.1" , 8080 ); future.sync().channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); log.debug("timeout" ); } finally { group.shutdownGracefully(); } } }
SO_BACKLOG
属于 ServerSocketChannal 参数
用于设置服务端监听套接字(ServerSocket)可接受的连接请求队列的长度
回忆一下TCP的三次握手:
sequenceDiagram participant c as client participant s as server participant sq as syns queue participant aq as accept queue s ->> s : bind() s ->> s : listen() c ->> c : connect() c ->> s : 1. SYN Note left of c : SYN_SEND s ->> sq : put Note right of s : SYN_RCVD s ->> c : 2. SYN + ACK Note left of c : ESTABLISHED c ->> s : 3. ACK sq ->> aq : put Note right of s : ESTABLISHED aq -->> s : s ->> s : accept()
第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue
netty中可以通过 option(ChannelOption.SO_BACKLOG, 值)来设置大小
ulimit -n
属于操作系统参数
用于设置当前用户能打开的最大文件描述符数量
TCP_NODELAY
属于 SocketChannal 参数
禁用 Nagle 算法,牺牲带宽效率,换来低延迟
ServerBootstrap bootstrap = new ServerBootstrap ();bootstrap.childOption(ChannelOption.TCP_NODELAY, true );
SO_SNDBUF & SO_RCVBUF
SO_SNDBUF 属于 SocketChannal 参数,socket发送缓冲区大小
SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上),socket接受缓冲区大小
ServerBootstrap bootstrap = new ServerBootstrap ();bootstrap.childOption(ChannelOption.SO_SNDBUF, 1024 * 64 ); bootstrap.childOption(ChannelOption.SO_RCVBUF, 1024 * 64 );
ALLOCATOR
属于 SocketChannal 参数
用来内存分配和缓冲区管理,分配 ByteBuf,ctx.alloc()
ServerBootstrap bootstrap = new ServerBootstrap ();bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
RCVBUF_ALLOCATOR 默认情况下,Netty 会循环调用 read()
将数据从 socket 读取到内存,但:
读太少 → 系统调用频繁,吞吐低;
读太多 → 无效分配,浪费内存,还可能引发 OOM。
为此,Netty 引入了 RecvByteBufAllocator
接口,允许动态调整每次读取的字节数,提高效率。
属于 SocketChannal 参数
控制 netty 接收缓冲区大小
负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定
ServerBootstrap bootstrap = new ServerBootstrap ();bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator (64 , 1024 , 65536 ) );