四. 优化与源码
1. 优化
1.1 扩展序列化算法
序列化,反序列化主要用在消息正文的转换上
序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理
目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下
1 2 3 4 5 6 7 8 9 10 11 byte [] body = new byte [bodyLength];byteByf.readBytes(body); ObjectInputStream in = new ObjectInputStream (new ByteArrayInputStream (body));Message message = (Message) in.readObject();message.setSequenceId(sequenceId); ByteArrayOutputStream out = new ByteArrayOutputStream ();new ObjectOutputStream (out).writeObject(message);byte [] bytes = out.toByteArray();
为了支持更多序列化算法,抽象一个 Serializer 接口
1 2 3 4 5 6 7 8 9 public interface Serializer { <T> T deserialize (Class<T> clazz, byte [] bytes) ; <T> byte [] serialize(T object); }
提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 enum SerializerAlgorithm implements Serializer { Java { @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { try { ObjectInputStream in = new ObjectInputStream (new ByteArrayInputStream (bytes)); Object object = in.readObject(); return (T) object; } catch (IOException | ClassNotFoundException e) { throw new RuntimeException ("SerializerAlgorithm.Java 反序列化错误" , e); } } @Override public <T> byte [] serialize(T object) { try { ByteArrayOutputStream out = new ByteArrayOutputStream (); new ObjectOutputStream (out).writeObject(object); return out.toByteArray(); } catch (IOException e) { throw new RuntimeException ("SerializerAlgorithm.Java 序列化错误" , e); } } }, Json { @Override public <T> T deserialize (Class<T> clazz, byte [] bytes) { return new Gson ().fromJson(new String (bytes, StandardCharsets.UTF_8), clazz); } @Override public <T> byte [] serialize(T object) { return new Gson ().toJson(object).getBytes(StandardCharsets.UTF_8); } }; public static SerializerAlgorithm getByInt (int type) { SerializerAlgorithm[] array = SerializerAlgorithm.values(); if (type < 0 || type > array.length - 1 ) { throw new IllegalArgumentException ("超过 SerializerAlgorithm 范围" ); } return array[type]; } }
增加配置类和配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public abstract class Config { static Properties properties; static { try (InputStream in = Config.class.getResourceAsStream("/application.properties" )) { properties = new Properties (); properties.load(in); } catch (IOException e) { throw new ExceptionInInitializerError (e); } } public static int getServerPort () { String value = properties.getProperty("server.port" ); if (value == null ) { return 8080 ; } else { return Integer.parseInt(value); } } public static Serializer.Algorithm getSerializerAlgorithm () { String value = properties.getProperty("serializer.algorithm" ); if (value == null ) { return Serializer.Algorithm.Java; } else { return Serializer.Algorithm.valueOf(value); } } }
配置文件
1 serializer.algorithm =Json
修改编解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class MessageCodecSharable extends MessageToMessageCodec <ByteBuf, Message> { @Override public void encode (ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception { ByteBuf out = ctx.alloc().buffer(); out.writeBytes(new byte []{1 , 2 , 3 , 4 }); out.writeByte(1 ); out.writeByte(Config.getSerializerAlgorithm().ordinal()); out.writeByte(msg.getMessageType()); out.writeInt(msg.getSequenceId()); out.writeByte(0xff ); byte [] bytes = Config.getSerializerAlgorithm().serialize(msg); out.writeInt(bytes.length); out.writeBytes(bytes); outList.add(out); } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int magicNum = in.readInt(); byte version = in.readByte(); byte serializerAlgorithm = in.readByte(); byte messageType = in.readByte(); int sequenceId = in.readInt(); in.readByte(); int length = in.readInt(); byte [] bytes = new byte [length]; in.readBytes(bytes, 0 , length); Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm]; Class<? extends Message > messageClass = Message.getMessageClass(messageType); Message message = algorithm.deserialize(messageClass, bytes); out.add(message); } }
其中确定具体消息类型,可以根据 消息类型字节
获取到对应的 消息 class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Data public abstract class Message implements Serializable { public static Class<? extends Message > getMessageClass(int messageType) { return messageClasses.get(messageType); } private int sequenceId; private int messageType; public abstract int getMessageType () ; public static final int LoginRequestMessage = 0 ; public static final int LoginResponseMessage = 1 ; public static final int ChatRequestMessage = 2 ; public static final int ChatResponseMessage = 3 ; public static final int GroupCreateRequestMessage = 4 ; public static final int GroupCreateResponseMessage = 5 ; public static final int GroupJoinRequestMessage = 6 ; public static final int GroupJoinResponseMessage = 7 ; public static final int GroupQuitRequestMessage = 8 ; public static final int GroupQuitResponseMessage = 9 ; public static final int GroupChatRequestMessage = 10 ; public static final int GroupChatResponseMessage = 11 ; public static final int GroupMembersRequestMessage = 12 ; public static final int GroupMembersResponseMessage = 13 ; public static final int PingMessage = 14 ; public static final int PongMessage = 15 ; private static final Map<Integer, Class<? extends Message >> messageClasses = new HashMap <>(); static { messageClasses.put(LoginRequestMessage, LoginRequestMessage.class); messageClasses.put(LoginResponseMessage, LoginResponseMessage.class); messageClasses.put(ChatRequestMessage, ChatRequestMessage.class); messageClasses.put(ChatResponseMessage, ChatResponseMessage.class); messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class); messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class); messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class); messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class); messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class); messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class); messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class); messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class); messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class); messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class); } }
1.2 参数调优
1)CONNECT_TIMEOUT_MILLIS
属于 SocketChannal 参数
用在客户端 建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间(但是对于netty来说是不需要的,因为netty中的accept、read是非阻塞的)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j public class TestConnectionTimeout { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap () .group(group) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300 ) .channel(NioSocketChannel.class) .handler(new LoggingHandler ()); ChannelFuture future = bootstrap.connect("127.0.0.1" , 8080 ); future.sync().channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); log.debug("timeout" ); } finally { group.shutdownGracefully(); } } }
另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public final void connect ( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0 ) { connectTimeoutFuture = eventLoop().schedule(new Runnable () { @Override public void run () { ChannelPromise connectPromise = AbstractNioChannel.this .connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException ("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } }
2)SO_BACKLOG
属于 ServerSocketChannal 参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 sequenceDiagram participant c as client participant s as server participant sq as syns queue participant aq as accept queue s ->> s : bind() s ->> s : listen() c ->> c : connect() c ->> s : 1. SYN Note left of c : SYN_SEND s ->> sq : put Note right of s : SYN_RCVD s ->> c : 2. SYN + ACK Note left of c : ESTABLISHED c ->> s : 3. ACK sq ->> aq : put Note right of s : ESTABLISHED aq -->> s : s ->> s : accept()
第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue (完成了三次握手才会进入 accept queue)
ps : 三次握手是发生在accept之前的。为什么第三次握手成功后不直接拿去用而是将请求从sync queue当如到 accept queue中呢,是因为服务器端进行accept的能力是有限的,比如客户端连接量特别大,accept就可能忙不过来了,这时就需要将已经完成三次握手的建立成功的信息放入到全连接队列中,服务器就可以从容不迫地accept了。
其中
netty 中
可以通过 .option(ChannelOption.SO_BACKLOG, 值) 来设置大小
可以通过下面源码查看默认大小
1 2 3 4 5 6 public class DefaultServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig { private volatile int backlog = NetUtil.SOMAXCONN; }
课堂调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey
oio 中更容易说明,不用 debug 模式
1 2 3 4 5 6 7 8 public class Server { public static void main (String[] args) throws IOException { ServerSocket ss = new ServerSocket (8888 , 2 ); Socket accept = ss.accept(); System.out.println(accept); System.in.read(); } }
客户端启动 4 个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class Client { public static void main (String[] args) throws IOException { try { Socket s = new Socket (); System.out.println(new Date ()+" connecting..." ); s.connect(new InetSocketAddress ("localhost" , 8888 ),1000 ); System.out.println(new Date ()+" connected..." ); s.getOutputStream().write(1 ); System.in.read(); } catch (IOException e) { System.out.println(new Date ()+" connecting timeout..." ); e.printStackTrace(); } } }
第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中
1 2 Tue Apr 21 20 :30 :28 CST 2020 connecting... Tue Apr 21 20 :30 :28 CST 2020 connected...
第 4 个客户端连接时
1 2 3 Tue Apr 21 20:53:58 CST 2020 connecting... Tue Apr 21 20:53:59 CST 2020 connecting timeout... java.net.SocketTimeoutException: connect timed out
查看某一变量的默认值的思路:
笔记:find usage可查看变量在什么地方被引用 ,然后沿着这个变量的赋值量去找到它初始赋值的位置,Idea右侧光亮处表示当前被选中的变量被使用
3)ulimit -n (允许一个进程能够同时打开文件描述符的数量)
4)TCP_NODELAY ()
5)SO_SNDBUF & SO_RCVBUF
SO_SNDBUF 属于 SocketChannal 参数
SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
6)ALLOCATOR
属于 SocketChannal 参数
用来分配 ByteBuf, ctx.alloc()
7)RCVBUF_ALLOCATOR
属于 SocketChannal 参数
控制 netty 接收缓冲区大小
负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定
1.3 RPC 框架
1)准备工作
这些代码可以认为是现成的,无需从头编写练习
为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Data public abstract class Message implements Serializable { public static final int RPC_MESSAGE_TYPE_REQUEST = 101 ; public static final int RPC_MESSAGE_TYPE_RESPONSE = 102 ; static { messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class); messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class); } }
请求消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Getter @ToString(callSuper = true) public class RpcRequestMessage extends Message { private String interfaceName; private String methodName; private Class<?> returnType; private Class[] parameterTypes; private Object[] parameterValue; public RpcRequestMessage (int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) { super .setSequenceId(sequenceId); this .interfaceName = interfaceName; this .methodName = methodName; this .returnType = returnType; this .parameterTypes = parameterTypes; this .parameterValue = parameterValue; } @Override public int getMessageType () { return RPC_MESSAGE_TYPE_REQUEST; } }
响应消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Data @ToString(callSuper = true) public class RpcResponseMessage extends Message { private Object returnValue; private Exception exceptionValue; @Override public int getMessageType () { return RPC_MESSAGE_TYPE_RESPONSE; } }
服务器架子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j public class RpcServer { public static void main (String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup (); NioEventLoopGroup worker = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = serverBootstrap.bind(8080 ).sync().channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { log.error("server error" , e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
客户端架子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class RpcClient { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); channel.closeFuture().sync(); } catch (Exception e) { log.error("client error" , e); } finally { group.shutdownGracefully(); } } }
服务器端的 service 获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class ServicesFactory { static Properties properties; static Map<Class<?>, Object> map = new ConcurrentHashMap <>(); static { try (InputStream in = Config.class.getResourceAsStream("/application.properties" )) { properties = new Properties (); properties.load(in); Set<String> names = properties.stringPropertyNames(); for (String name : names) { if (name.endsWith("Service" )) { Class<?> interfaceClass = Class.forName(name); Class<?> instanceClass = Class.forName(properties.getProperty(name)); map.put(interfaceClass, instanceClass.newInstance()); } } } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw new ExceptionInInitializerError (e); } } public static <T> T getService (Class<T> interfaceClass) { return (T) map.get(interfaceClass); } }
相关配置 application.properties
1 2 serializer.algorithm=Json cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl
2)服务器 handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Slf4j @ChannelHandler .Sharablepublic class RpcRequestMessageHandler extends SimpleChannelInboundHandler <RpcRequestMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcRequestMessage message) { RpcResponseMessage response = new RpcResponseMessage (); response.setSequenceId(message.getSequenceId()); try { HelloService service = (HelloService) ServicesFactory.getService(Class.forName(message.getInterfaceName())); Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes()); Object invoke = method.invoke(service, message.getParameterValue()); response.setReturnValue(invoke); } catch (Exception e) { e.printStackTrace(); response.setExceptionValue(e); } ctx.writeAndFlush(response); } }
3)客户端代码第一版
只发消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Slf4j public class RpcClient { public static void main (String[] args) { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); Channel channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage ( 1 , "cn.itcast.server.service.HelloService" , "sayHello" , String.class, new Class []{String.class}, new Object []{"张三" } )).addListener(promise -> { if (!promise.isSuccess()) { Throwable cause = promise.cause(); log.error("error" , cause); } }); channel.closeFuture().sync(); } catch (Exception e) { log.error("client error" , e); } finally { group.shutdownGracefully(); } } }
理一下流程:
客户端发送请求消息,请求消息就找到 Pipeline 中的出战处理器,从下向上依次执行,比如这里只有两个出战处理器,先通过 MESSAGE_CODEC处理器 对请求消息进行编码,然后记录日志,最后请求消息就发出去了
消息发出之后,服务器端就拿到消息,就进行入站处理,做半包、黏包处理,记录日志,消息解码,最后交给RPC的请求handler
拿到rpc请求消息之后,根据消息信息得到接口,根据接口再得到真的实现对象;找到要调用的方法;然后反射进行调用;最后根据成功还是异常来将结果放入response响应消息中,响应消息通过ctx来返回。
响应消息又会经过服务器端的出战处理,经过消息编码,记录日志,然后发给客户端
客户端再做入站处理,对消息做黏包半包处理,记录日志,消息解码,最后交给了客户端的RPC的Handler
客户端的rpc handler最后把消息打印
4)客户端 handler 第一版
1 2 3 4 5 6 7 8 @Slf4j @ChannelHandler .Sharablepublic class RpcResponseMessageHandler extends SimpleChannelInboundHandler <RpcResponseMessage> { @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { log.debug("{}" , msg); } }
5)客户端代码 第二版
包括 channel 管理,代理,接收结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 package cn.itcast.client.Rpc;import cn.itcast.message.Rpc.RpcRequestMessage;import cn.itcast.protocol.MessageCodecSharable;import cn.itcast.protocol.ProcotolFrameDecoder;import cn.itcast.protocol.SequenceIdGenerator;import cn.itcast.server.handler.Rpc.RpcResponseMessageHandler;import cn.itcast.server.service.HelloService;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import io.netty.util.concurrent.DefaultPromise;import lombok.extern.slf4j.Slf4j;import java.lang.reflect.Proxy;@Slf4j public class RpcClientManager { private static Channel channel = null ; public static void main (String[] args) { HelloService service = getProxyService(HelloService.class); System.out.println(service.sayHello("zhangsan" )); System.out.println(service.sayHello("lisi" )); System.out.println(service.sayHello("wangwu" )); } public static <T> T getProxyService (Class<T> serviceClass) { ClassLoader loader = serviceClass.getClassLoader(); Class<?>[] interfaces = new Class []{serviceClass}; Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> { int sequenceId = SequenceIdGenerator.nextId(); RpcRequestMessage msg = new RpcRequestMessage ( sequenceId, serviceClass.getName(), method.getName(), method.getReturnType(), method.getParameterTypes(), args ); getChannel().writeAndFlush(msg); DefaultPromise<Object> promise = new DefaultPromise <>(getChannel().eventLoop()); RpcResponseMessageHandler.PROMISES.put(sequenceId, promise); promise.await(); if (promise.isSuccess()) { return promise.getNow(); } else { throw new RuntimeException (promise.cause()); } }); return (T) o; } private static final Object LOCK = new Object (); public static Channel getChannel () { if (channel != null ) { return channel; } initChannel(); return channel; } private static void initChannel () { NioEventLoopGroup group = new NioEventLoopGroup (); LoggingHandler LOGGING_HANDLER = new LoggingHandler (LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable (); RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler (); Bootstrap bootstrap = new Bootstrap (); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder ()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(RPC_HANDLER); } }); try { channel = bootstrap.connect("localhost" , 8080 ).sync().channel(); channel.closeFuture().addListener(future -> { group.shutdownGracefully(); }); } catch (Exception e) { log.error("client error" , e); } } }
6)客户端 handler 第二版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import cn.itcast.message.Rpc.RpcResponseMessage;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.concurrent.Promise;import lombok.extern.slf4j.Slf4j;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;@Slf4j @ChannelHandler .Sharablepublic class RpcResponseMessageHandler extends SimpleChannelInboundHandler <RpcResponseMessage> { public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap <>(); @Override protected void channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { log.debug("{}" , msg); Promise<Object> promise = PROMISES.remove(msg.getSequenceId()); if (promise != null ) { Object returnValue = msg.getReturnValue(); Exception exceptionValue = msg.getExceptionValue(); if (exceptionValue != null ) { promise.setFailure(exceptionValue); } else { promise.setSuccess(returnValue); } } } }
首先,消息的发送方将消息对象发送出去,但是一时半会结果回不来那么快,所以需要一个Promise对象来接收消息,而Promise对象放在了PROMISES集合当中,Promise对象调用同步方法来等结果;这是就到了RpcResonseMessageHandler,假如它接收到了服务器端返回来的消息,它就根据消息队列号来从Promise集合中取出还未填充结果的promise,如果promise != null,就判断结果中是正常还是异常(根据exceptionValue,如果不为null就证明有异常,否则没有异常),而无论是否有异常,都会让同步等待结果中的 await() 结束等待,恢复运行,然后再通过.isSuccess()来判断结果是否异常(因为await无论结果成功失败它都不会抛异常的)
我悟了!代理类创建好容器放入map中。handler处理完毕将结果放入map中。main线程从map中取
2. 源码分析
2.1 启动剖析
我们就来看看 netty 中对下面的代码是怎样进行处理的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Selector selector = Selector.open(); NioServerSocketChannel attachment = new NioServerSocketChannel ();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false ); SelectionKey selectionKey = serverSocketChannel.register(selector, 0 , attachment);serverSocketChannel.bind(new InetSocketAddress (8080 )); selectionKey.interestOps(SelectionKey.OP_ACCEPT);
入口 io.netty.bootstrap.ServerBootstrap#bind
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private ChannelFuture doBind (final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null ) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise (channel); regFuture.addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null ) { promise.setFailure(cause); } else { promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final ChannelFuture initAndRegister () { Channel channel = null ; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { return new DefaultChannelPromise (new FailedChannel (), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null ) { } return regFuture; }
关键代码 io.netty.bootstrap.ServerBootstrap#init
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 void init (Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0 )); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0 )); } p.addLast(new ChannelInitializer <Channel>() { @Override public void initChannel (final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null ) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable () { @Override public void run () { pipeline.addLast(new ServerBootstrapAcceptor ( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public final void register (EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable () { @Override public void run () { register0(promise); } }); } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private void register0 (ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
关键代码 io.netty.channel.ChannelInitializer#initChannel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private boolean initChannel (ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this ) != null ) { pipeline.remove(this ); } } return true ; } return false ; }
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static void doBind0 ( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable () { @Override public void run () { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public final void bind (final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { } boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return ; } if (!wasActive && isActive()) { invokeLater(new Runnable () { @Override public void run () { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
3.3 关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind
1 2 3 4 5 6 7 protected void doBind (SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7 ) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
3.4 关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
1 2 3 4 5 public void channelActive (ChannelHandlerContext ctx) { ctx.fireChannelActive(); readIfIsAutoRead(); }
关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
2.2 NioEventLoop 剖析
NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务),
提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void execute (Runnable task) { if (task == null ) { throw new NullPointerException ("task" ); } boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); if (isShutdown()) { } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
唤醒 select 阻塞线程io.netty.channel.nio.NioEventLoop#wakeup
1 2 3 4 5 6 @Override protected void wakeup (boolean inEventLoop) { if (!inEventLoop && wakenUp.compareAndSet(false , true )) { selector.wakeup(); } }
启动 EventLoop 主循环 io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void doStartThread () { assert thread == null ; executor.execute(new Runnable () { @Override public void run () { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false ; updateLastExecutionTime(); try { SingleThreadEventExecutor.this .run(); success = true ; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: " , t); } finally { } } }); }
io.netty.channel.nio.NioEventLoop#run
主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 protected void run () { for (;;) { try { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue ; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: boolean oldWakenUp = wakenUp.getAndSet(false ); select(oldWakenUp); if (wakenUp.get()) { selector.wakeup(); } default : } } catch (IOException e) { rebuildSelector0(); handleLoopException(e); continue ; } cancelledKeys = 0 ; needsToSelectAgain = false ; final int ioRatio = this .ioRatio; if (ioRatio == 100 ) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return ; } } } catch (Throwable t) { handleLoopException(t); } } }
⚠️ 注意
这里有个费解的地方就是 wakeup,它既可以由提交任务的线程来调用(比较好理解),也可以由 EventLoop 线程来调用(比较费解),这里要知道 wakeup 方法的效果:
由非 EventLoop 线程调用,会唤醒当前在执行 select 阻塞的 EventLoop 线程
由 EventLoop 自己调用,会本次的 wakeup 会取消下一次的 select 操作
参考下图
io.netty.channel.nio.NioEventLoop#select
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 private void select (boolean oldWakenUp) throws IOException { Selector selector = this .selector; try { int selectCnt = 0 ; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L ) / 1000000L ; if (timeoutMillis <= 0 ) { if (selectCnt == 0 ) { selector.selectNow(); selectCnt = 1 ; } break ; } if (hasTasks() && wakenUp.compareAndSet(false , true )) { selector.selectNow(); selectCnt = 1 ; break ; } int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break ; } if (Thread.interrupted()) { selectCnt = 1 ; break ; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1 ; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { selector = selectRebuildSelector(selectCnt); selectCnt = 1 ; break ; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { } } catch (CancelledKeyException e) { } }
处理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys
1 2 3 4 5 6 7 8 9 private void processSelectedKeys () { if (selectedKeys != null ) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
io.netty.channel.nio.NioEventLoop#processSelectedKey
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void processSelectedKey (SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { return ; } try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0 ) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0 ) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 ) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
2.3 accept 剖析
nio 中如下代码,在 netty 中的流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); if (key.isAcceptable()) { SocketChannel channel = serverSocketChannel.accept(); channel.configureBlocking(false ); channel.register(selector, SelectionKey.OP_READ); } }
先来看可接入事件处理(accept)
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public void read () { assert eventLoop () .inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false ; Throwable exception = null ; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0 ) { break ; } if (localRead < 0 ) { closed = true ; break ; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0 ; i < size; i ++) { readPending = false ; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null ) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true ; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public void channelRead (ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public final void register (EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this .eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable () { @Override public void run () { register0(promise); } }); } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private void register0 (ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return ; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false ; registered = true ; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
1 2 3 4 5 public void channelActive (ChannelHandlerContext ctx) { ctx.fireChannelActive(); readIfIsAutoRead(); }
io.netty.channel.nio.AbstractNioChannel#doBeginRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected void doBeginRead () throws Exception { final SelectionKey selectionKey = this .selectionKey; if (!selectionKey.isValid()) { return ; } readPending = true ; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0 ) { selectionKey.interestOps(interestOps | readInterestOp); } }
2.4 read 剖析
再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public final void read () { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return ; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null ; boolean close = false ; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0 ) { byteBuf.release(); byteBuf = null ; close = allocHandle.lastBytesRead() < 0 ; if (close) { readPending = false ; } break ; } allocHandle.incMessagesRead(1 ); readPending = false ; pipeline.fireChannelRead(byteBuf); byteBuf = null ; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)
1 2 3 4 5 6 7 8 9 10 11 12 public boolean continueReading (UncheckedBooleanSupplier maybeMoreDataSupplier) { return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead > 0 ; }