四. 优化与源码 
  1. 优化 
  1.1 扩展序列化算法 
序列化,反序列化主要用在消息正文的转换上
序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[]) 
反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理 
 
目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下
1 2 3 4 5 6 7 8 9 10 11 byte [] body = new  byte [bodyLength];byteByf.readBytes(body); ObjectInputStream  in  =  new  ObjectInputStream (new  ByteArrayInputStream (body));Message  message  =  (Message) in.readObject();message.setSequenceId(sequenceId); ByteArrayOutputStream  out  =  new  ByteArrayOutputStream ();new  ObjectOutputStream (out).writeObject(message);byte [] bytes = out.toByteArray();
 
为了支持更多序列化算法,抽象一个 Serializer 接口
1 2 3 4 5 6 7 8 9 public  interface  Serializer  {         <T> T deserialize (Class<T> clazz, byte [] bytes) ;          <T> byte [] serialize(T object); } 
 
提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 enum  SerializerAlgorithm  implements  Serializer  {	     Java {         @Override          public  <T> T deserialize (Class<T> clazz, byte [] bytes)  {             try  {                 ObjectInputStream  in  =                       new  ObjectInputStream (new  ByteArrayInputStream (bytes));                 Object  object  =  in.readObject();                 return  (T) object;             } catch  (IOException | ClassNotFoundException e) {                 throw  new  RuntimeException ("SerializerAlgorithm.Java 反序列化错误" , e);             }         }         @Override          public  <T> byte [] serialize(T object) {             try  {                 ByteArrayOutputStream  out  =  new  ByteArrayOutputStream ();                 new  ObjectOutputStream (out).writeObject(object);                 return  out.toByteArray();             } catch  (IOException e) {                 throw  new  RuntimeException ("SerializerAlgorithm.Java 序列化错误" , e);             }         }     },           Json {         @Override          public  <T> T deserialize (Class<T> clazz, byte [] bytes)  {             return  new  Gson ().fromJson(new  String (bytes, StandardCharsets.UTF_8), clazz);         }         @Override          public  <T> byte [] serialize(T object) {             return  new  Gson ().toJson(object).getBytes(StandardCharsets.UTF_8);         }     };          public  static  SerializerAlgorithm getByInt (int  type)  {         SerializerAlgorithm[] array = SerializerAlgorithm.values();         if  (type < 0  || type > array.length - 1 ) {             throw  new  IllegalArgumentException ("超过 SerializerAlgorithm 范围" );         }         return  array[type];     } } 
 
增加配置类和配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public  abstract  class  Config  {    static  Properties properties;     static  {         try  (InputStream  in  =  Config.class.getResourceAsStream("/application.properties" )) {             properties = new  Properties ();             properties.load(in);         } catch  (IOException e) {             throw  new  ExceptionInInitializerError (e);         }     }     public  static  int  getServerPort ()  {         String  value  =  properties.getProperty("server.port" );         if (value == null ) {             return  8080 ;         } else  {             return  Integer.parseInt(value);         }     }     public  static  Serializer.Algorithm getSerializerAlgorithm ()  {         String  value  =  properties.getProperty("serializer.algorithm" );         if (value == null ) {             return  Serializer.Algorithm.Java;         } else  {             return  Serializer.Algorithm.valueOf(value);         }     } } 
 
配置文件
1 serializer.algorithm =Json 
 
修改编解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public  class  MessageCodecSharable  extends  MessageToMessageCodec <ByteBuf, Message> {    @Override      public  void  encode (ChannelHandlerContext ctx, Message msg, List<Object> outList)  throws  Exception {         ByteBuf  out  =  ctx.alloc().buffer();                  out.writeBytes(new  byte []{1 , 2 , 3 , 4 });                  out.writeByte(1 );                  out.writeByte(Config.getSerializerAlgorithm().ordinal());                  out.writeByte(msg.getMessageType());                  out.writeInt(msg.getSequenceId());                  out.writeByte(0xff );                  byte [] bytes = Config.getSerializerAlgorithm().serialize(msg);                  out.writeInt(bytes.length);                  out.writeBytes(bytes);         outList.add(out);     }     @Override      protected  void  decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out)  throws  Exception {         int  magicNum  =  in.readInt();         byte  version  =  in.readByte();         byte  serializerAlgorithm  =  in.readByte();          byte  messageType  =  in.readByte();          int  sequenceId  =  in.readInt();         in.readByte();         int  length  =  in.readInt();         byte [] bytes = new  byte [length];         in.readBytes(bytes, 0 , length);                  Serializer.Algorithm  algorithm  =  Serializer.Algorithm.values()[serializerAlgorithm];                  Class<? extends  Message > messageClass = Message.getMessageClass(messageType);         Message  message  =  algorithm.deserialize(messageClass, bytes);         out.add(message);     } } 
 
其中确定具体消息类型,可以根据 消息类型字节 获取到对应的 消息 class
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Data public  abstract  class  Message  implements  Serializable  {         public  static  Class<? extends  Message > getMessageClass(int  messageType) {         return  messageClasses.get(messageType);     }     private  int  sequenceId;     private  int  messageType;     public  abstract  int  getMessageType () ;     public  static  final  int  LoginRequestMessage  =  0 ;     public  static  final  int  LoginResponseMessage  =  1 ;     public  static  final  int  ChatRequestMessage  =  2 ;     public  static  final  int  ChatResponseMessage  =  3 ;     public  static  final  int  GroupCreateRequestMessage  =  4 ;     public  static  final  int  GroupCreateResponseMessage  =  5 ;     public  static  final  int  GroupJoinRequestMessage  =  6 ;     public  static  final  int  GroupJoinResponseMessage  =  7 ;     public  static  final  int  GroupQuitRequestMessage  =  8 ;     public  static  final  int  GroupQuitResponseMessage  =  9 ;     public  static  final  int  GroupChatRequestMessage  =  10 ;     public  static  final  int  GroupChatResponseMessage  =  11 ;     public  static  final  int  GroupMembersRequestMessage  =  12 ;     public  static  final  int  GroupMembersResponseMessage  =  13 ;     public  static  final  int  PingMessage  =  14 ;     public  static  final  int  PongMessage  =  15 ;     private  static  final  Map<Integer, Class<? extends  Message >> messageClasses = new  HashMap <>();     static  {         messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);         messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);         messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);         messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);         messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);         messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);         messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);         messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);         messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);         messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);         messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);         messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);         messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);         messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);     } } 
 
  1.2 参数调优 
  1)CONNECT_TIMEOUT_MILLIS 
属于 SocketChannal 参数 
用在客户端 建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常 
SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间(但是对于netty来说是不需要的,因为netty中的accept、read是非阻塞的) 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j public  class  TestConnectionTimeout  {    public  static  void  main (String[] args)  {         NioEventLoopGroup  group  =  new  NioEventLoopGroup ();         try  {             Bootstrap  bootstrap  =  new  Bootstrap ()                     .group(group)                     .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300 )                     .channel(NioSocketChannel.class)                     .handler(new  LoggingHandler ());             ChannelFuture  future  =  bootstrap.connect("127.0.0.1" , 8080 );             future.sync().channel().closeFuture().sync();          } catch  (Exception e) {             e.printStackTrace();             log.debug("timeout" );         } finally  {             group.shutdownGracefully();         }     } } 
 
另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public  final  void  connect (         final  SocketAddress remoteAddress, final  SocketAddress localAddress, final  ChannelPromise promise)  {              int  connectTimeoutMillis  =  config().getConnectTimeoutMillis();     if  (connectTimeoutMillis > 0 ) {         connectTimeoutFuture = eventLoop().schedule(new  Runnable () {             @Override              public  void  run ()  {                                 ChannelPromise  connectPromise  =  AbstractNioChannel.this .connectPromise;                 ConnectTimeoutException  cause  =                      new  ConnectTimeoutException ("connection timed out: "  + remoteAddress);                  if  (connectPromise != null  && connectPromise.tryFailure(cause)) {                     close(voidPromise());                 }             }         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);     } 	 } 
 
  2)SO_BACKLOG 
属于 ServerSocketChannal 参数 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 sequenceDiagram participant c as client participant s as server participant sq as syns queue participant aq as accept queue s ->> s : bind() s ->> s : listen() c ->> c : connect() c ->> s : 1. SYN Note left of c : SYN_SEND s ->> sq : put Note right of s : SYN_RCVD s ->> c : 2. SYN + ACK Note left of c : ESTABLISHED c ->> s : 3. ACK sq ->> aq : put Note right of s : ESTABLISHED aq -->> s :  s ->> s : accept() 
 
第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列 
第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server 
第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue  (完成了三次握手才会进入 accept queue) 
 
ps :  三次握手是发生在accept之前的。为什么第三次握手成功后不直接拿去用而是将请求从sync queue当如到 accept queue中呢,是因为服务器端进行accept的能力是有限的,比如客户端连接量特别大,accept就可能忙不过来了,这时就需要将已经完成三次握手的建立成功的信息放入到全连接队列中,服务器就可以从容不迫地accept了。
其中
netty 中
可以通过 .option(ChannelOption.SO_BACKLOG, 值) 来设置大小
可以通过下面源码查看默认大小
1 2 3 4 5 6 public  class  DefaultServerSocketChannelConfig  extends  DefaultChannelConfig                                               implements  ServerSocketChannelConfig  {     private  volatile  int  backlog  =  NetUtil.SOMAXCONN;      } 
 
课堂调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey
oio 中更容易说明,不用 debug 模式
1 2 3 4 5 6 7 8 public  class  Server  {    public  static  void  main (String[] args)  throws  IOException {         ServerSocket  ss  =  new  ServerSocket (8888 , 2 );         Socket  accept  =  ss.accept();         System.out.println(accept);         System.in.read();     } } 
 
客户端启动 4 个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public  class  Client  {    public  static  void  main (String[] args)  throws  IOException {         try  {             Socket  s  =  new  Socket ();             System.out.println(new  Date ()+" connecting..." );             s.connect(new  InetSocketAddress ("localhost" , 8888 ),1000 );             System.out.println(new  Date ()+" connected..." );             s.getOutputStream().write(1 );             System.in.read();         } catch  (IOException e) {             System.out.println(new  Date ()+" connecting timeout..." );             e.printStackTrace();         }     } } 
 
第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中
1 2 Tue Apr 21  20 :30 :28  CST 2020  connecting... Tue Apr 21  20 :30 :28  CST 2020  connected... 
 
第 4 个客户端连接时
1 2 3 Tue Apr 21 20:53:58 CST 2020 connecting... Tue Apr 21 20:53:59 CST 2020 connecting timeout... java.net.SocketTimeoutException: connect timed out 
 
查看某一变量的默认值的思路:
笔记:find usage可查看变量在什么地方被引用 ,然后沿着这个变量的赋值量去找到它初始赋值的位置,Idea右侧光亮处表示当前被选中的变量被使用
  3)ulimit -n  (允许一个进程能够同时打开文件描述符的数量) 
  4)TCP_NODELAY  () 
  5)SO_SNDBUF & SO_RCVBUF 
SO_SNDBUF 属于 SocketChannal 参数 
SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上) 
 
  6)ALLOCATOR 
属于 SocketChannal 参数 
用来分配 ByteBuf, ctx.alloc() 
 
  7)RCVBUF_ALLOCATOR 
属于 SocketChannal 参数 
控制 netty 接收缓冲区大小 
负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定 
 
  1.3 RPC 框架 
  1)准备工作 
这些代码可以认为是现成的,无需从头编写练习
为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Data public  abstract  class  Message  implements  Serializable  {         public  static  final  int  RPC_MESSAGE_TYPE_REQUEST  =  101 ;     public  static  final  int   RPC_MESSAGE_TYPE_RESPONSE  =  102 ;     static  {                  messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);         messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);     } } 
 
请求消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Getter @ToString(callSuper = true) public  class  RpcRequestMessage  extends  Message  {         private  String interfaceName;          private  String methodName;          private  Class<?> returnType;          private  Class[] parameterTypes;          private  Object[] parameterValue;     public  RpcRequestMessage (int  sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue)  {         super .setSequenceId(sequenceId);         this .interfaceName = interfaceName;         this .methodName = methodName;         this .returnType = returnType;         this .parameterTypes = parameterTypes;         this .parameterValue = parameterValue;     }     @Override      public  int  getMessageType ()  {         return  RPC_MESSAGE_TYPE_REQUEST;     } } 
 
响应消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Data @ToString(callSuper = true) public  class  RpcResponseMessage  extends  Message  {         private  Object returnValue;          private  Exception exceptionValue;     @Override      public  int  getMessageType ()  {         return  RPC_MESSAGE_TYPE_RESPONSE;     } } 
 
服务器架子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j public  class  RpcServer  {    public  static  void  main (String[] args)  {         NioEventLoopGroup  boss  =  new  NioEventLoopGroup ();         NioEventLoopGroup  worker  =  new  NioEventLoopGroup ();         LoggingHandler  LOGGING_HANDLER  =  new  LoggingHandler (LogLevel.DEBUG);         MessageCodecSharable  MESSAGE_CODEC  =  new  MessageCodecSharable ();                           RpcRequestMessageHandler  RPC_HANDLER  =  new  RpcRequestMessageHandler ();         try  {             ServerBootstrap  serverBootstrap  =  new  ServerBootstrap ();             serverBootstrap.channel(NioServerSocketChannel.class);             serverBootstrap.group(boss, worker);             serverBootstrap.childHandler(new  ChannelInitializer <SocketChannel>() {                 @Override                  protected  void  initChannel (SocketChannel ch)  throws  Exception {                     ch.pipeline().addLast(new  ProcotolFrameDecoder ());                     ch.pipeline().addLast(LOGGING_HANDLER);                     ch.pipeline().addLast(MESSAGE_CODEC);                     ch.pipeline().addLast(RPC_HANDLER);                 }             });             Channel  channel  =  serverBootstrap.bind(8080 ).sync().channel();             channel.closeFuture().sync();         } catch  (InterruptedException e) {             log.error("server error" , e);         } finally  {             boss.shutdownGracefully();             worker.shutdownGracefully();         }     } } 
 
客户端架子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public  class  RpcClient  {    public  static  void  main (String[] args)  {         NioEventLoopGroup  group  =  new  NioEventLoopGroup ();         LoggingHandler  LOGGING_HANDLER  =  new  LoggingHandler (LogLevel.DEBUG);         MessageCodecSharable  MESSAGE_CODEC  =  new  MessageCodecSharable ();                           RpcResponseMessageHandler  RPC_HANDLER  =  new  RpcResponseMessageHandler ();         try  {             Bootstrap  bootstrap  =  new  Bootstrap ();             bootstrap.channel(NioSocketChannel.class);             bootstrap.group(group);             bootstrap.handler(new  ChannelInitializer <SocketChannel>() {                 @Override                  protected  void  initChannel (SocketChannel ch)  throws  Exception {                     ch.pipeline().addLast(new  ProcotolFrameDecoder ());                     ch.pipeline().addLast(LOGGING_HANDLER);                     ch.pipeline().addLast(MESSAGE_CODEC);                     ch.pipeline().addLast(RPC_HANDLER);                 }             });             Channel  channel  =  bootstrap.connect("localhost" , 8080 ).sync().channel();             channel.closeFuture().sync();         } catch  (Exception e) {             log.error("client error" , e);         } finally  {             group.shutdownGracefully();         }     } } 
 
服务器端的 service 获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public  class  ServicesFactory  {    static  Properties properties;     static  Map<Class<?>, Object> map = new  ConcurrentHashMap <>();     static  {         try  (InputStream  in  =  Config.class.getResourceAsStream("/application.properties" )) {             properties = new  Properties ();             properties.load(in);             Set<String> names = properties.stringPropertyNames();             for  (String name : names) {                 if  (name.endsWith("Service" )) {                     Class<?> interfaceClass = Class.forName(name);                     Class<?> instanceClass = Class.forName(properties.getProperty(name));                     map.put(interfaceClass, instanceClass.newInstance());                 }             }         } catch  (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {             throw  new  ExceptionInInitializerError (e);         }     }     public  static  <T> T getService (Class<T> interfaceClass)  {         return  (T) map.get(interfaceClass);     } } 
 
相关配置 application.properties
1 2 serializer.algorithm=Json cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl 
 
  2)服务器 handler 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Slf4j @ChannelHandler .Sharablepublic  class  RpcRequestMessageHandler  extends  SimpleChannelInboundHandler <RpcRequestMessage> {    @Override      protected  void  channelRead0 (ChannelHandlerContext ctx, RpcRequestMessage message)  {         RpcResponseMessage  response  =  new  RpcResponseMessage ();         response.setSequenceId(message.getSequenceId());         try  {                          HelloService  service  =  (HelloService)                     ServicesFactory.getService(Class.forName(message.getInterfaceName()));                                       Method  method  =  service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());                                       Object  invoke  =  method.invoke(service, message.getParameterValue());                          response.setReturnValue(invoke);         } catch  (Exception e) {             e.printStackTrace();                          response.setExceptionValue(e);         }                  ctx.writeAndFlush(response);     } } 
 
  3)客户端代码第一版 
只发消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Slf4j public  class  RpcClient  {    public  static  void  main (String[] args)  {         NioEventLoopGroup  group  =  new  NioEventLoopGroup ();         LoggingHandler  LOGGING_HANDLER  =  new  LoggingHandler (LogLevel.DEBUG);         MessageCodecSharable  MESSAGE_CODEC  =  new  MessageCodecSharable ();         RpcResponseMessageHandler  RPC_HANDLER  =  new  RpcResponseMessageHandler ();         try  {             Bootstrap  bootstrap  =  new  Bootstrap ();             bootstrap.channel(NioSocketChannel.class);             bootstrap.group(group);             bootstrap.handler(new  ChannelInitializer <SocketChannel>() {                 @Override                  protected  void  initChannel (SocketChannel ch)  throws  Exception {                     ch.pipeline().addLast(new  ProcotolFrameDecoder ());                     ch.pipeline().addLast(LOGGING_HANDLER);                     ch.pipeline().addLast(MESSAGE_CODEC);                     ch.pipeline().addLast(RPC_HANDLER);                 }             });             Channel  channel  =  bootstrap.connect("localhost" , 8080 ).sync().channel();             ChannelFuture  future  =  channel.writeAndFlush(new  RpcRequestMessage (                     1 ,                     "cn.itcast.server.service.HelloService" ,                                     "sayHello" ,                     String.class,                     new  Class []{String.class},                     new  Object []{"张三" }             )).addListener(promise -> {                 if  (!promise.isSuccess()) {                     Throwable  cause  =  promise.cause();                     log.error("error" , cause);                 }             });             channel.closeFuture().sync();         } catch  (Exception e) {             log.error("client error" , e);         } finally  {             group.shutdownGracefully();         }     } } 
 
理一下流程:
客户端发送请求消息,请求消息就找到 Pipeline 中的出战处理器,从下向上依次执行,比如这里只有两个出战处理器,先通过 MESSAGE_CODEC处理器 对请求消息进行编码,然后记录日志,最后请求消息就发出去了 
 
消息发出之后,服务器端就拿到消息,就进行入站处理,做半包、黏包处理,记录日志,消息解码,最后交给RPC的请求handler 
 
拿到rpc请求消息之后,根据消息信息得到接口,根据接口再得到真的实现对象;找到要调用的方法;然后反射进行调用;最后根据成功还是异常来将结果放入response响应消息中,响应消息通过ctx来返回。 
 
响应消息又会经过服务器端的出战处理,经过消息编码,记录日志,然后发给客户端 
 
客户端再做入站处理,对消息做黏包半包处理,记录日志,消息解码,最后交给了客户端的RPC的Handler 
 
客户端的rpc handler最后把消息打印 
 
  4)客户端 handler 第一版 
1 2 3 4 5 6 7 8 @Slf4j @ChannelHandler .Sharablepublic  class  RpcResponseMessageHandler  extends  SimpleChannelInboundHandler <RpcResponseMessage> {    @Override      protected  void  channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg)  throws  Exception {         log.debug("{}" , msg);     } } 
 
  5)客户端代码 第二版 
包括 channel 管理,代理,接收结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 package  cn.itcast.client.Rpc;import  cn.itcast.message.Rpc.RpcRequestMessage;import  cn.itcast.protocol.MessageCodecSharable;import  cn.itcast.protocol.ProcotolFrameDecoder;import  cn.itcast.protocol.SequenceIdGenerator;import  cn.itcast.server.handler.Rpc.RpcResponseMessageHandler;import  cn.itcast.server.service.HelloService;import  io.netty.bootstrap.Bootstrap;import  io.netty.channel.Channel;import  io.netty.channel.ChannelInitializer;import  io.netty.channel.nio.NioEventLoopGroup;import  io.netty.channel.socket.SocketChannel;import  io.netty.channel.socket.nio.NioSocketChannel;import  io.netty.handler.logging.LogLevel;import  io.netty.handler.logging.LoggingHandler;import  io.netty.util.concurrent.DefaultPromise;import  lombok.extern.slf4j.Slf4j;import  java.lang.reflect.Proxy;@Slf4j public  class  RpcClientManager  {    private  static  Channel  channel  =  null ;     public  static  void  main (String[] args)  {         HelloService  service  =  getProxyService(HelloService.class);         System.out.println(service.sayHello("zhangsan" ));         System.out.println(service.sayHello("lisi" ));         System.out.println(service.sayHello("wangwu" ));     }          public  static  <T> T getProxyService (Class<T> serviceClass)  {                  ClassLoader  loader  =  serviceClass.getClassLoader();                  Class<?>[] interfaces = new  Class []{serviceClass};                           Object  o  =  Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {                          int  sequenceId  =  SequenceIdGenerator.nextId();             RpcRequestMessage  msg  =  new  RpcRequestMessage (                     sequenceId,                       serviceClass.getName(),                             method.getName(),                     method.getReturnType(),                     method.getParameterTypes(),                         args                                        );                          getChannel().writeAndFlush(msg);                          DefaultPromise<Object> promise = new  DefaultPromise <>(getChannel().eventLoop());             RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);                                                                 promise.await();             if  (promise.isSuccess()) {                                  return  promise.getNow();             } else  {                                  throw  new  RuntimeException (promise.cause());             }         });         return  (T) o;     }          private  static  final  Object  LOCK  =  new  Object ();          public  static  Channel getChannel ()  {         if  (channel != null ) {             return  channel;         }         initChannel();         return  channel;     }          private  static  void  initChannel ()  {         NioEventLoopGroup  group  =  new  NioEventLoopGroup ();         LoggingHandler  LOGGING_HANDLER  =  new  LoggingHandler (LogLevel.DEBUG);         MessageCodecSharable  MESSAGE_CODEC  =  new  MessageCodecSharable ();                  RpcResponseMessageHandler  RPC_HANDLER  =  new  RpcResponseMessageHandler ();         Bootstrap  bootstrap  =  new  Bootstrap ();         bootstrap.channel(NioSocketChannel.class);         bootstrap.group(group);         bootstrap.handler(new  ChannelInitializer <SocketChannel>() {             @Override              protected  void  initChannel (SocketChannel ch)  throws  Exception {                 ch.pipeline().addLast(new  ProcotolFrameDecoder ());                 ch.pipeline().addLast(LOGGING_HANDLER);                 ch.pipeline().addLast(MESSAGE_CODEC);                 ch.pipeline().addLast(RPC_HANDLER);             }         });         try  {             channel = bootstrap.connect("localhost" , 8080 ).sync().channel();                          channel.closeFuture().addListener(future -> {                 group.shutdownGracefully();             });         } catch  (Exception e) {             log.error("client error" , e);         }     } } 
 
  6)客户端 handler 第二版 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import  cn.itcast.message.Rpc.RpcResponseMessage;import  io.netty.channel.ChannelHandler;import  io.netty.channel.ChannelHandlerContext;import  io.netty.channel.SimpleChannelInboundHandler;import  io.netty.util.concurrent.Promise;import  lombok.extern.slf4j.Slf4j;import  java.util.Map;import  java.util.concurrent.ConcurrentHashMap;@Slf4j @ChannelHandler .Sharablepublic  class  RpcResponseMessageHandler  extends  SimpleChannelInboundHandler <RpcResponseMessage> {         public  static  final  Map<Integer, Promise<Object>> PROMISES = new  ConcurrentHashMap <>();     @Override      protected  void  channelRead0 (ChannelHandlerContext ctx, RpcResponseMessage msg)  throws  Exception {         log.debug("{}" , msg);                  Promise<Object> promise = PROMISES.remove(msg.getSequenceId());           if  (promise != null ) {             Object  returnValue  =  msg.getReturnValue();             Exception  exceptionValue  =  msg.getExceptionValue();             if  (exceptionValue != null ) {                 promise.setFailure(exceptionValue);             } else  {                 promise.setSuccess(returnValue);             }         }     } } 
 
首先,消息的发送方将消息对象发送出去,但是一时半会结果回不来那么快,所以需要一个Promise对象来接收消息,而Promise对象放在了PROMISES集合当中,Promise对象调用同步方法来等结果;这是就到了RpcResonseMessageHandler,假如它接收到了服务器端返回来的消息,它就根据消息队列号来从Promise集合中取出还未填充结果的promise,如果promise != null,就判断结果中是正常还是异常(根据exceptionValue,如果不为null就证明有异常,否则没有异常),而无论是否有异常,都会让同步等待结果中的 await() 结束等待,恢复运行,然后再通过.isSuccess()来判断结果是否异常(因为await无论结果成功失败它都不会抛异常的)
我悟了!代理类创建好容器放入map中。handler处理完毕将结果放入map中。main线程从map中取
  2. 源码分析 
  2.1 启动剖析 
我们就来看看 netty 中对下面的代码是怎样进行处理的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Selector  selector  =  Selector.open(); NioServerSocketChannel  attachment  =  new  NioServerSocketChannel ();ServerSocketChannel  serverSocketChannel  =  ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false ); SelectionKey  selectionKey  =  serverSocketChannel.register(selector, 0 , attachment);serverSocketChannel.bind(new  InetSocketAddress (8080 )); selectionKey.interestOps(SelectionKey.OP_ACCEPT); 
 
入口 io.netty.bootstrap.ServerBootstrap#bind
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private  ChannelFuture doBind (final  SocketAddress localAddress)  {	     final  ChannelFuture  regFuture  =  initAndRegister();     final  Channel  channel  =  regFuture.channel();     if  (regFuture.cause() != null ) {         return  regFuture;     }               if  (regFuture.isDone()) {         ChannelPromise  promise  =  channel.newPromise();                  doBind0(regFuture, channel, localAddress, promise);         return  promise;     }           else  {         final  PendingRegistrationPromise  promise  =  new  PendingRegistrationPromise (channel);                  regFuture.addListener(new  ChannelFutureListener () {             @Override              public  void  operationComplete (ChannelFuture future)  throws  Exception {                 Throwable  cause  =  future.cause();                 if  (cause != null ) {                                          promise.setFailure(cause);                 } else  {                     promise.registered(); 					                     doBind0(regFuture, channel, localAddress, promise);                 }             }         });         return  promise;     } } 
 
关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 final  ChannelFuture initAndRegister ()  {    Channel  channel  =  null ;     try  {         channel = channelFactory.newChannel();                  init(channel);     } catch  (Throwable t) {                  return  new  DefaultChannelPromise (new  FailedChannel (), GlobalEventExecutor.INSTANCE).setFailure(t);     }          ChannelFuture  regFuture  =  config().group().register(channel);     if  (regFuture.cause() != null ) {              }     return  regFuture; } 
 
关键代码 io.netty.bootstrap.ServerBootstrap#init
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 void  init (Channel channel)  throws  Exception {    final  Map<ChannelOption<?>, Object> options = options0();     synchronized  (options) {         setChannelOptions(channel, options, logger);     }     final  Map<AttributeKey<?>, Object> attrs = attrs0();     synchronized  (attrs) {         for  (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {             @SuppressWarnings("unchecked")              AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();             channel.attr(key).set(e.getValue());         }     }     ChannelPipeline  p  =  channel.pipeline();     final  EventLoopGroup  currentChildGroup  =  childGroup;     final  ChannelHandler  currentChildHandler  =  childHandler;     final  Entry<ChannelOption<?>, Object>[] currentChildOptions;     final  Entry<AttributeKey<?>, Object>[] currentChildAttrs;     synchronized  (childOptions) {         currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0 ));     }     synchronized  (childAttrs) {         currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0 ));     } 	          p.addLast(new  ChannelInitializer <Channel>() {         @Override          public  void  initChannel (final  Channel ch)  throws  Exception {             final  ChannelPipeline  pipeline  =  ch.pipeline();             ChannelHandler  handler  =  config.handler();             if  (handler != null ) {                 pipeline.addLast(handler);             }                          ch.eventLoop().execute(new  Runnable () {                 @Override                  public  void  run ()  {                     pipeline.addLast(new  ServerBootstrapAcceptor (                             ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                 }             });         }     }); } 
 
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public  final  void  register (EventLoop eventLoop, final  ChannelPromise promise)  {         AbstractChannel.this .eventLoop = eventLoop;     if  (eventLoop.inEventLoop()) {         register0(promise);     } else  {         try  {                                                    eventLoop.execute(new  Runnable () {                 @Override                  public  void  run ()  {                     register0(promise);                 }             });         } catch  (Throwable t) {                          closeForcibly();             closeFuture.setClosed();             safeSetFailure(promise, t);         }     } } 
 
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private  void  register0 (ChannelPromise promise)  {    try  {         if  (!promise.setUncancellable() || !ensureOpen(promise)) {             return ;         }         boolean  firstRegistration  =  neverRegistered;                  doRegister();         neverRegistered = false ;         registered = true ;                  pipeline.invokeHandlerAddedIfNeeded();                  safeSetSuccess(promise);         pipeline.fireChannelRegistered();                           if  (isActive()) {             if  (firstRegistration) {                 pipeline.fireChannelActive();             } else  if  (config().isAutoRead()) {                 beginRead();             }         }     } catch  (Throwable t) {                  closeForcibly();         closeFuture.setClosed();         safeSetFailure(promise, t);     } } 
 
关键代码 io.netty.channel.ChannelInitializer#initChannel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private  boolean  initChannel (ChannelHandlerContext ctx)  throws  Exception {    if  (initMap.add(ctx)) {          try  {                          initChannel((C) ctx.channel());         } catch  (Throwable cause) {             exceptionCaught(ctx, cause);         } finally  {                          ChannelPipeline  pipeline  =  ctx.pipeline();             if  (pipeline.context(this ) != null ) {                 pipeline.remove(this );             }         }         return  true ;     }     return  false ; } 
 
关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private  static  void  doBind0 (         final  ChannelFuture regFuture, final  Channel channel,         final  SocketAddress localAddress, final  ChannelPromise promise)  {    channel.eventLoop().execute(new  Runnable () {         @Override          public  void  run ()  {             if  (regFuture.isSuccess()) {                 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);             } else  {                 promise.setFailure(regFuture.cause());             }         }     }); } 
 
关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public  final  void  bind (final  SocketAddress localAddress, final  ChannelPromise promise)  {    assertEventLoop();     if  (!promise.setUncancellable() || !ensureOpen(promise)) {         return ;     }     if  (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&         localAddress instanceof  InetSocketAddress &&         !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&         !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {              }     boolean  wasActive  =  isActive();     try  {                  doBind(localAddress);     } catch  (Throwable t) {         safeSetFailure(promise, t);         closeIfClosed();         return ;     }     if  (!wasActive && isActive()) {         invokeLater(new  Runnable () {             @Override              public  void  run ()  {                                  pipeline.fireChannelActive();             }         });     }     safeSetSuccess(promise); } 
 
3.3 关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind
1 2 3 4 5 6 7 protected  void  doBind (SocketAddress localAddress)  throws  Exception {    if  (PlatformDependent.javaVersion() >= 7 ) {         javaChannel().bind(localAddress, config.getBacklog());     } else  {         javaChannel().socket().bind(localAddress, config.getBacklog());     } } 
 
3.4 关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
1 2 3 4 5 public  void  channelActive (ChannelHandlerContext ctx)  {    ctx.fireChannelActive(); 	     readIfIsAutoRead(); } 
 
关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected  void  doBeginRead ()  throws  Exception {         final  SelectionKey  selectionKey  =  this .selectionKey;     if  (!selectionKey.isValid()) {         return ;     }     readPending = true ;     final  int  interestOps  =  selectionKey.interestOps();          if  ((interestOps & readInterestOp) == 0 ) {         selectionKey.interestOps(interestOps | readInterestOp);     } } 
 
  2.2 NioEventLoop 剖析 
NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务),
提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public  void  execute (Runnable task)  {    if  (task == null ) {         throw  new  NullPointerException ("task" );     }     boolean  inEventLoop  =  inEventLoop();          addTask(task);     if  (!inEventLoop) {                  startThread();         if  (isShutdown()) {                      }     }     if  (!addTaskWakesUp && wakesUpForTask(task)) {                  wakeup(inEventLoop);     } } 
 
唤醒 select 阻塞线程io.netty.channel.nio.NioEventLoop#wakeup
1 2 3 4 5 6 @Override protected  void  wakeup (boolean  inEventLoop)  {    if  (!inEventLoop && wakenUp.compareAndSet(false , true )) {         selector.wakeup();     } } 
 
启动 EventLoop 主循环 io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private  void  doStartThread ()  {    assert  thread == null ;     executor.execute(new  Runnable () {         @Override          public  void  run ()  {                          thread = Thread.currentThread();             if  (interrupted) {                 thread.interrupt();             }             boolean  success  =  false ;             updateLastExecutionTime();             try  {                                  SingleThreadEventExecutor.this .run();                 success = true ;             } catch  (Throwable t) {                 logger.warn("Unexpected exception from an event executor: " , t);             } finally  { 				             }         }     }); } 
 
io.netty.channel.nio.NioEventLoop#run 主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 protected  void  run ()  {    for  (;;) {         try  {             try  {                                                                    switch  (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                     case  SelectStrategy.CONTINUE:                         continue ;                     case  SelectStrategy.BUSY_WAIT:                     case  SelectStrategy.SELECT:                                                                           boolean  oldWakenUp  =  wakenUp.getAndSet(false );                                                                                                                                                                               select(oldWakenUp);                         if  (wakenUp.get()) {                             selector.wakeup();                         }                     default :                 }             } catch  (IOException e) {                 rebuildSelector0();                 handleLoopException(e);                 continue ;             }             cancelledKeys = 0 ;             needsToSelectAgain = false ;                          final  int  ioRatio  =  this .ioRatio;             if  (ioRatio == 100 ) {                 try  {                     processSelectedKeys();                 } finally  {                                          runAllTasks();                 }             } else  {                                 final  long  ioStartTime  =  System.nanoTime();                 try  {                     processSelectedKeys();                 } finally  {                                          final  long  ioTime  =  System.nanoTime() - ioStartTime;                                          runAllTasks(ioTime * (100  - ioRatio) / ioRatio);                 }             }         } catch  (Throwable t) {             handleLoopException(t);         }         try  {             if  (isShuttingDown()) {                 closeAll();                 if  (confirmShutdown()) {                     return ;                 }             }         } catch  (Throwable t) {             handleLoopException(t);         }     } } 
 
  ⚠️ 注意 
这里有个费解的地方就是 wakeup,它既可以由提交任务的线程来调用(比较好理解),也可以由 EventLoop 线程来调用(比较费解),这里要知道 wakeup 方法的效果:
由非 EventLoop 线程调用,会唤醒当前在执行 select 阻塞的 EventLoop 线程 
由 EventLoop 自己调用,会本次的 wakeup 会取消下一次的 select 操作 
 
 
参考下图
 
io.netty.channel.nio.NioEventLoop#select
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 private  void  select (boolean  oldWakenUp)  throws  IOException {    Selector  selector  =  this .selector;     try  {         int  selectCnt  =  0 ;         long  currentTimeNanos  =  System.nanoTime();                                    long  selectDeadLineNanos  =  currentTimeNanos + delayNanos(currentTimeNanos);         for  (;;) {             long  timeoutMillis  =  (selectDeadLineNanos - currentTimeNanos + 500000L ) / 1000000L ;                          if  (timeoutMillis <= 0 ) {                 if  (selectCnt == 0 ) {                     selector.selectNow();                     selectCnt = 1 ;                 }                 break ;             }                                       if  (hasTasks() && wakenUp.compareAndSet(false , true )) {                 selector.selectNow();                 selectCnt = 1 ;                 break ;             }                                       int  selectedKeys  =  selector.select(timeoutMillis);                          selectCnt ++;                          if  (selectedKeys != 0  || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {                 break ;             }             if  (Thread.interrupted()) {                	                                  selectCnt = 1 ;                 break ;             }             long  time  =  System.nanoTime();             if  (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {                                  selectCnt = 1 ;             }                                        else  if  (SELECTOR_AUTO_REBUILD_THRESHOLD > 0  &&                     selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {                                  selector = selectRebuildSelector(selectCnt);                 selectCnt = 1 ;                 break ;             }             currentTimeNanos = time;         }         if  (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {                      }     } catch  (CancelledKeyException e) {              } } 
 
处理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys
1 2 3 4 5 6 7 8 9 private  void  processSelectedKeys ()  {    if  (selectedKeys != null ) {                           processSelectedKeysOptimized();     } else  {         processSelectedKeysPlain(selector.selectedKeys());     } } 
 
io.netty.channel.nio.NioEventLoop#processSelectedKey
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private  void  processSelectedKey (SelectionKey k, AbstractNioChannel ch)  {    final  AbstractNioChannel.NioUnsafe  unsafe  =  ch.unsafe();          if  (!k.isValid()) {                  return ;     }     try  {         int  readyOps  =  k.readyOps();                  if  ((readyOps & SelectionKey.OP_CONNECT) != 0 ) {             int  ops  =  k.interestOps();             ops &= ~SelectionKey.OP_CONNECT;             k.interestOps(ops);             unsafe.finishConnect();         }                  if  ((readyOps & SelectionKey.OP_WRITE) != 0 ) {             ch.unsafe().forceFlush();         }                  if  ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0  || readyOps == 0 ) {                                       unsafe.read();         }     } catch  (CancelledKeyException ignored) {         unsafe.close(unsafe.voidPromise());     } } 
 
  2.3 accept 剖析 
nio 中如下代码,在 netty 中的流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while  (iter.hasNext()) {             SelectionKey  key  =  iter.next();               if  (key.isAcceptable()) {                           SocketChannel  channel  =  serverSocketChannel.accept();         channel.configureBlocking(false );                           channel.register(selector, SelectionKey.OP_READ);     }      } 
 
先来看可接入事件处理(accept)
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public  void  read ()  {    assert  eventLoop () .inEventLoop();     final  ChannelConfig  config  =  config();     final  ChannelPipeline  pipeline  =  pipeline();         final  RecvByteBufAllocator.Handle  allocHandle  =  unsafe().recvBufAllocHandle();     allocHandle.reset(config);     boolean  closed  =  false ;     Throwable  exception  =  null ;     try  {         try  {             do  { 				                                  int  localRead  =  doReadMessages(readBuf);                 if  (localRead == 0 ) {                     break ;                 }                 if  (localRead < 0 ) {                     closed = true ;                     break ;                 } 				                 allocHandle.incMessagesRead(localRead);             } while  (allocHandle.continueReading());         } catch  (Throwable t) {             exception = t;         }         int  size  =  readBuf.size();         for  (int  i  =  0 ; i < size; i ++) {             readPending = false ;                                       pipeline.fireChannelRead(readBuf.get(i));         }         readBuf.clear();         allocHandle.readComplete();         pipeline.fireChannelReadComplete();         if  (exception != null ) {             closed = closeOnReadError(exception);             pipeline.fireExceptionCaught(exception);         }         if  (closed) {             inputShutdown = true ;             if  (isOpen()) {                 close(voidPromise());             }         }     } finally  {         if  (!readPending && !config.isAutoRead()) {             removeReadOp();         }     } } 
 
关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public  void  channelRead (ChannelHandlerContext ctx, Object msg)  {         final  Channel  child  =  (Channel) msg;          child.pipeline().addLast(childHandler);          setChannelOptions(child, childOptions, logger);     for  (Entry<AttributeKey<?>, Object> e: childAttrs) {         child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());     }     try  {                  childGroup.register(child).addListener(new  ChannelFutureListener () {             @Override              public  void  operationComplete (ChannelFuture future)  throws  Exception {                 if  (!future.isSuccess()) {                     forceClose(child, future.cause());                 }             }         });     } catch  (Throwable t) {         forceClose(child, t);     } } 
 
又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register  方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public  final  void  register (EventLoop eventLoop, final  ChannelPromise promise)  {         AbstractChannel.this .eventLoop = eventLoop;     if  (eventLoop.inEventLoop()) {         register0(promise);     } else  {         try  {                          eventLoop.execute(new  Runnable () {                 @Override                  public  void  run ()  {                     register0(promise);                 }             });         } catch  (Throwable t) {                          closeForcibly();             closeFuture.setClosed();             safeSetFailure(promise, t);         }     } } 
 
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private  void  register0 (ChannelPromise promise)  {    try  {         if  (!promise.setUncancellable() || !ensureOpen(promise)) {             return ;         }         boolean  firstRegistration  =  neverRegistered;         doRegister();         neverRegistered = false ;         registered = true ; 		                  pipeline.invokeHandlerAddedIfNeeded();                  safeSetSuccess(promise);         pipeline.fireChannelRegistered();                  if  (isActive()) {             if  (firstRegistration) {                                  pipeline.fireChannelActive();             } else  if  (config().isAutoRead()) {                 beginRead();             }         }     } catch  (Throwable t) {         closeForcibly();         closeFuture.setClosed();         safeSetFailure(promise, t);     } } 
 
回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
1 2 3 4 5 public  void  channelActive (ChannelHandlerContext ctx)  {    ctx.fireChannelActive(); 	     readIfIsAutoRead(); } 
 
io.netty.channel.nio.AbstractNioChannel#doBeginRead
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected  void  doBeginRead ()  throws  Exception {         final  SelectionKey  selectionKey  =  this .selectionKey;     if  (!selectionKey.isValid()) {         return ;     }     readPending = true ; 	     final  int  interestOps  =  selectionKey.interestOps();     if  ((interestOps & readInterestOp) == 0 ) {                  selectionKey.interestOps(interestOps | readInterestOp);     } } 
 
  2.4 read 剖析 
再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public  final  void  read ()  {    final  ChannelConfig  config  =  config();     if  (shouldBreakReadReady(config)) {         clearReadPending();         return ;     }     final  ChannelPipeline  pipeline  =  pipeline();          final  ByteBufAllocator  allocator  =  config.getAllocator();          final  RecvByteBufAllocator.Handle  allocHandle  =  recvBufAllocHandle();     allocHandle.reset(config);     ByteBuf  byteBuf  =  null ;     boolean  close  =  false ;     try  {         do  {             byteBuf = allocHandle.allocate(allocator);                          allocHandle.lastBytesRead(doReadBytes(byteBuf));             if  (allocHandle.lastBytesRead() <= 0 ) {                 byteBuf.release();                 byteBuf = null ;                 close = allocHandle.lastBytesRead() < 0 ;                 if  (close) {                     readPending = false ;                 }                 break ;             }             allocHandle.incMessagesRead(1 );             readPending = false ;                          pipeline.fireChannelRead(byteBuf);             byteBuf = null ;         }                   while  (allocHandle.continueReading());         allocHandle.readComplete();                  pipeline.fireChannelReadComplete();         if  (close) {             closeOnRead(pipeline);         }     } catch  (Throwable t) {         handleReadException(pipeline, byteBuf, t, close, allocHandle);     } finally  {         if  (!readPending && !config.isAutoRead()) {             removeReadOp();         }     } } 
 
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)
1 2 3 4 5 6 7 8 9 10 11 12 public  boolean  continueReading (UncheckedBooleanSupplier maybeMoreDataSupplier)  {    return                          config.isAutoRead() &&                                    (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&                        totalMessages < maxMessagePerRead &&                        totalBytesRead > 0 ; }