(Netty)4-优化与源码

四. 优化与源码

1. 优化

1.1 扩展序列化算法

序列化,反序列化主要用在消息正文的转换上

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理

目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下

1
2
3
4
5
6
7
8
9
10
11
// 反序列化
byte[] body = new byte[bodyLength];
byteByf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);

// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();

为了支持更多序列化算法,抽象一个 Serializer 接口

1
2
3
4
5
6
7
8
9
public interface Serializer {

// 反序列化方法
<T> T deserialize(Class<T> clazz, byte[] bytes);

// 序列化方法
<T> byte[] serialize(T object);

}

提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
enum SerializerAlgorithm implements Serializer {
// Java 实现
Java {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
ObjectInputStream in =
new ObjectInputStream(new ByteArrayInputStream(bytes));
Object object = in.readObject();
return (T) object;
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误", e);
}
}

@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(object);
return out.toByteArray();
} catch (IOException e) {
throw new RuntimeException("SerializerAlgorithm.Java 序列化错误", e);
}
}
},
// Json 实现(引入了 Gson 依赖)
Json {
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);
}

@Override
public <T> byte[] serialize(T object) {
return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);
}
};

// 需要从协议的字节中得到是哪种序列化算法
public static SerializerAlgorithm getByInt(int type) {
SerializerAlgorithm[] array = SerializerAlgorithm.values();
if (type < 0 || type > array.length - 1) {
throw new IllegalArgumentException("超过 SerializerAlgorithm 范围");
}
return array[type];
}
}

增加配置类和配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class Config {
static Properties properties;
static {
try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}
public static int getServerPort() {
String value = properties.getProperty("server.port");
if(value == null) {
return 8080;
} else {
return Integer.parseInt(value);
}
}
public static Serializer.Algorithm getSerializerAlgorithm() {
String value = properties.getProperty("serializer.algorithm");
if(value == null) {
return Serializer.Algorithm.Java;
} else {
return Serializer.Algorithm.valueOf(value);
}
}
}

配置文件

1
serializer.algorithm=Json

修改编解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
out.writeByte(Config.getSerializerAlgorithm().ordinal());
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
outList.add(out);
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerAlgorithm = in.readByte(); // 0 或 1
byte messageType = in.readByte(); // 0,1,2...
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);

// 找到反序列化算法
Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];
// 确定具体消息类型
Class<? extends Message> messageClass = Message.getMessageClass(messageType);
Message message = algorithm.deserialize(messageClass, bytes);
// log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
// log.debug("{}", message);
out.add(message);
}
}

其中确定具体消息类型,可以根据 消息类型字节 获取到对应的 消息 class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Data
public abstract class Message implements Serializable {

/**
* 根据消息类型字节,获得对应的消息 class
* @param messageType 消息类型字节
* @return 消息 class
*/
public static Class<? extends Message> getMessageClass(int messageType) {
return messageClasses.get(messageType);
}

private int sequenceId;

private int messageType;

public abstract int getMessageType();

public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
public static final int PingMessage = 14;
public static final int PongMessage = 15;
private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();

static {
messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
}
}

1.2 参数调优

1)CONNECT_TIMEOUT_MILLIS

  • 属于 SocketChannal 参数
  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
  • SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间(但是对于netty来说是不需要的,因为netty中的accept、read是非阻塞的)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class TestConnectionTimeout {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler());
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
future.sync().channel().closeFuture().sync(); // 断点1
} catch (Exception e) {
e.printStackTrace();
log.debug("timeout");
} finally {
group.shutdownGracefully();
}
}
}

另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// ...
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
// ...
}

2)SO_BACKLOG

  • 属于 ServerSocketChannal 参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
sequenceDiagram

participant c as client
participant s as server
participant sq as syns queue
participant aq as accept queue

s ->> s : bind()
s ->> s : listen()
c ->> c : connect()
c ->> s : 1. SYN
Note left of c : SYN_SEND
s ->> sq : put
Note right of s : SYN_RCVD
s ->> c : 2. SYN + ACK
Note left of c : ESTABLISHED
c ->> s : 3. ACK
sq ->> aq : put
Note right of s : ESTABLISHED
aq -->> s :
s ->> s : accept()
  1. 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
  2. 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
  3. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue (完成了三次握手才会进入 accept queue)

ps : 三次握手是发生在accept之前的。为什么第三次握手成功后不直接拿去用而是将请求从sync queue当如到 accept queue中呢,是因为服务器端进行accept的能力是有限的,比如客户端连接量特别大,accept就可能忙不过来了,这时就需要将已经完成三次握手的建立成功的信息放入到全连接队列中,服务器就可以从容不迫地accept了。

其中

  • 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制

  • sync queue - 半连接队列

    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  • accept queue - 全连接队列

    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
    • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client

netty 中

可以通过 .option(ChannelOption.SO_BACKLOG, 值) 来设置大小

可以通过下面源码查看默认大小

1
2
3
4
5
6
public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
implements ServerSocketChannelConfig {

private volatile int backlog = NetUtil.SOMAXCONN;
// ...
}

课堂调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey

oio 中更容易说明,不用 debug 模式

1
2
3
4
5
6
7
8
public class Server {
public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(8888, 2);
Socket accept = ss.accept();
System.out.println(accept);
System.in.read();
}
}

客户端启动 4 个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Client {
public static void main(String[] args) throws IOException {
try {
Socket s = new Socket();
System.out.println(new Date()+" connecting...");
s.connect(new InetSocketAddress("localhost", 8888),1000);
System.out.println(new Date()+" connected...");
s.getOutputStream().write(1);
System.in.read();
} catch (IOException e) {
System.out.println(new Date()+" connecting timeout...");
e.printStackTrace();
}
}
}

第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中

1
2
Tue Apr 21 20:30:28 CST 2020 connecting...
Tue Apr 21 20:30:28 CST 2020 connected...

第 4 个客户端连接时

1
2
3
Tue Apr 21 20:53:58 CST 2020 connecting...
Tue Apr 21 20:53:59 CST 2020 connecting timeout...
java.net.SocketTimeoutException: connect timed out

查看某一变量的默认值的思路:

笔记:find usage可查看变量在什么地方被引用 ,然后沿着这个变量的赋值量去找到它初始赋值的位置,Idea右侧光亮处表示当前被选中的变量被使用

3)ulimit -n (允许一个进程能够同时打开文件描述符的数量)

  • 属于操作系统参数(这是在linux系统中配置的)

4)TCP_NODELAY ()

  • 属于 SocketChannal 参数

5)SO_SNDBUF & SO_RCVBUF

  • SO_SNDBUF 属于 SocketChannal 参数
  • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)

6)ALLOCATOR

  • 属于 SocketChannal 参数
  • 用来分配 ByteBuf, ctx.alloc()

7)RCVBUF_ALLOCATOR

  • 属于 SocketChannal 参数
  • 控制 netty 接收缓冲区大小
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定

1.3 RPC 框架

1)准备工作

这些代码可以认为是现成的,无需从头编写练习

为了简化起见,在原来聊天项目的基础上新增 Rpc 请求和响应消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Data
public abstract class Message implements Serializable {

// 省略旧的代码

public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;

static {
// ...
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}

}

请求消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {

/**
* 调用的接口全限定名,服务端根据它找到实现
*/
private String interfaceName;
/**
* 调用接口中的方法名
*/
private String methodName;
/**
* 方法返回类型
*/
private Class<?> returnType;
/**
* 方法参数类型数组
*/
private Class[] parameterTypes;
/**
* 方法参数值数组
*/
private Object[] parameterValue;

public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
super.setSequenceId(sequenceId);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValue = parameterValue;
}

@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_REQUEST;
}
}

响应消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
/**
* 返回值
*/
private Object returnValue;
/**
* 异常值
*/
private Exception exceptionValue;

@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_RESPONSE;
}
}

服务器架子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Slf4j
public class RpcServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();

// rpc 请求消息处理器,待实现
RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

客户端架子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class RpcClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();

// rpc 响应消息处理器,待实现
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}

服务器端的 service 获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ServicesFactory {

static Properties properties;
static Map<Class<?>, Object> map = new ConcurrentHashMap<>();

static {
try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
Set<String> names = properties.stringPropertyNames();
for (String name : names) {
if (name.endsWith("Service")) {
Class<?> interfaceClass = Class.forName(name);
Class<?> instanceClass = Class.forName(properties.getProperty(name));
map.put(interfaceClass, instanceClass.newInstance());
}
}
} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new ExceptionInInitializerError(e);
}
}

public static <T> T getService(Class<T> interfaceClass) {
return (T) map.get(interfaceClass);
}
}

相关配置 application.properties

1
2
serializer.algorithm=Json
cn.itcast.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl

2)服务器 handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
RpcResponseMessage response = new RpcResponseMessage();
response.setSequenceId(message.getSequenceId());
try {
// 获取真正的实现对象
HelloService service = (HelloService)
ServicesFactory.getService(Class.forName(message.getInterfaceName()));

// 获取要调用的方法
Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());

// 调用方法
Object invoke = method.invoke(service, message.getParameterValue());
// 调用成功
response.setReturnValue(invoke);
} catch (Exception e) {
e.printStackTrace();
// 调用异常
response.setExceptionValue(e);
}
// 返回结果
ctx.writeAndFlush(response);
}
}

3)客户端代码第一版

只发消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
public class RpcClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();

ChannelFuture future = channel.writeAndFlush(new RpcRequestMessage(
1,
"cn.itcast.server.service.HelloService",
// cn.itcast.server.service.HelloService
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"张三"}
)).addListener(promise -> {
if (!promise.isSuccess()) {
Throwable cause = promise.cause();
log.error("error", cause);
}
});

channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}

理一下流程:

  1. 客户端发送请求消息,请求消息就找到 Pipeline 中的出战处理器,从下向上依次执行,比如这里只有两个出战处理器,先通过 MESSAGE_CODEC处理器 对请求消息进行编码,然后记录日志,最后请求消息就发出去了

image-20230203121037606

  1. 消息发出之后,服务器端就拿到消息,就进行入站处理,做半包、黏包处理,记录日志,消息解码,最后交给RPC的请求handler

image-20230203121457206

  1. 拿到rpc请求消息之后,根据消息信息得到接口,根据接口再得到真的实现对象;找到要调用的方法;然后反射进行调用;最后根据成功还是异常来将结果放入response响应消息中,响应消息通过ctx来返回。

image-20230203122403827

  1. 响应消息又会经过服务器端的出战处理,经过消息编码,记录日志,然后发给客户端

image-20230203122727101

  1. 客户端再做入站处理,对消息做黏包半包处理,记录日志,消息解码,最后交给了客户端的RPC的Handler

image-20230203123026745

  1. 客户端的rpc handler最后把消息打印

image-20230203123144421

4)客户端 handler 第一版

1
2
3
4
5
6
7
8
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
log.debug("{}", msg);
}
}

5)客户端代码 第二版

包括 channel 管理,代理,接收结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package cn.itcast.client.Rpc;

import cn.itcast.message.Rpc.RpcRequestMessage;
import cn.itcast.protocol.MessageCodecSharable;
import cn.itcast.protocol.ProcotolFrameDecoder;
import cn.itcast.protocol.SequenceIdGenerator;
import cn.itcast.server.handler.Rpc.RpcResponseMessageHandler;
import cn.itcast.server.service.HelloService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Proxy;

/**
* @author : 其然乐衣Letitbe
* @date : 2023/2/3
*/
@Slf4j
public class RpcClientManager {
private static Channel channel = null;
public static void main(String[] args) {
HelloService service = getProxyService(HelloService.class);
System.out.println(service.sayHello("zhangsan"));
System.out.println(service.sayHello("lisi"));
System.out.println(service.sayHello("wangwu"));

}

/**
* 创建一个代理类
* 实现远程调用的接口,代理类里所有方法的调用,都会多做一件事,就是把本身方法的调用转化成 rpc 的请求消息,
* 再由代理类去调用channel去发送消息,这样就把复杂的过程屏蔽起来了
*/
public static <T> T getProxyService(Class<T> serviceClass) {
// 获取类加载器
ClassLoader loader = serviceClass.getClassLoader();
// 代理类要实现的接口
Class<?>[] interfaces = new Class[]{serviceClass};

// (proxy:代理对象 method:代理类正在执行的方法 args:方法的实际参数)
// 代理类里的任何一个方法的调用都会进入这个 (proxy, method,args)->{} lambda表达式
Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
// 1. 将方法调用转换为 消息对象
int sequenceId = SequenceIdGenerator.nextId();
RpcRequestMessage msg = new RpcRequestMessage(
sequenceId, // 让序列号id唯一
serviceClass.getName(), // 接口类名
method.getName(),
method.getReturnType(),
method.getParameterTypes(), // 方法的参数类型
args // 方法的参数
);
// 2. 将消息对象发送出去
getChannel().writeAndFlush(msg);

// 3. 准备一个空 Promise 对象,来接收结果 指定 promise 对象异步接收的结果线程
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);

// 4. 等来 promise 的结果
// ( 这个结果其实就是 异步的网络调用,通过同步的方式来等待结果,其中用到了promise )
// 因为要等promise中有结果了才返回,所以这里要用promise的同步方法
// await()将来无论有成功失败都不会抛异常,而sync()失败会抛异常
promise.await();
if (promise.isSuccess()) {
// 调用正常
return promise.getNow();
} else {
// 调用失败
throw new RuntimeException(promise.cause());
}
});
return (T) o;
}


/**
* 用单例模式来 实现 channel 只被初始化一次
*/
private static final Object LOCK = new Object();
/**
* 获取唯一的channel对像
*/
public static Channel getChannel() {
if (channel != null) {
return channel;
}
initChannel();
return channel;
}

/**
* 初始化 channel 方法
*/
private static void initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();

// rpc 响应消息处理器,待实现
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();

Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_HANDLER);
}
});

try {
channel = bootstrap.connect("localhost", 8080).sync().channel();

// 这里必须用异步,否则 getChannel 永远返回不了而一直处于阻塞直达 channel 关闭
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (Exception e) {
log.error("client error", e);
}
}
}

6)客户端 handler 第二版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import cn.itcast.message.Rpc.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author : 其然乐衣Letitbe
* @date : 2023/2/3
*/
@Slf4j
/** 设置为Sharable前提是(考虑到)线程安全的 */
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {

/** 序号 用来接收共享的 promise 对象 ConcurrentHashMap 是线程安全的 */
public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
log.debug("{}", msg);
// 拿到空的 promise remove返回并移除(避免promise越积越多)
Promise<Object> promise = PROMISES.remove(msg.getSequenceId());

if (promise != null) {
Object returnValue = msg.getReturnValue();
Exception exceptionValue = msg.getExceptionValue();
if (exceptionValue != null) {
promise.setFailure(exceptionValue);
} else {
promise.setSuccess(returnValue);
}
}
}
}

image-20230204143132005

首先,消息的发送方将消息对象发送出去,但是一时半会结果回不来那么快,所以需要一个Promise对象来接收消息,而Promise对象放在了PROMISES集合当中,Promise对象调用同步方法来等结果;这是就到了RpcResonseMessageHandler,假如它接收到了服务器端返回来的消息,它就根据消息队列号来从Promise集合中取出还未填充结果的promise,如果promise != null,就判断结果中是正常还是异常(根据exceptionValue,如果不为null就证明有异常,否则没有异常),而无论是否有异常,都会让同步等待结果中的 await() 结束等待,恢复运行,然后再通过.isSuccess()来判断结果是否异常(因为await无论结果成功失败它都不会抛异常的)

image-20230204161347823

我悟了!代理类创建好容器放入map中。handler处理完毕将结果放入map中。main线程从map中取

2. 源码分析

2.1 启动剖析

我们就来看看 netty 中对下面的代码是怎样进行处理的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//1 netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open();

//2 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();

//3 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);

//4 启动 nio boss 线程执行接下来的操作

//5 注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);

//6 head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor

//7 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));

//8 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);

入口 io.netty.bootstrap.ServerBootstrap#bind

关键代码 io.netty.bootstrap.AbstractBootstrap#doBind

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1. 执行初始化和注册 regFuture 会由 initAndRegister 设置其是否完成,从而回调 3.2 处代码
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

// 2. 因为是 initAndRegister 异步执行,需要分两种情况来看,调试时也需要通过 suspend 断点类型加以区分
// 2.1 如果已经完成
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// 3.1 立刻调用 doBind0
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
// 2.2 还没有完成
else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 3.2 回调 doBind0
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// 处理异常...
promise.setFailure(cause);
} else {
promise.registered();
// 3. 由注册线程去执行 doBind0
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

关键代码 io.netty.bootstrap.AbstractBootstrap#initAndRegister

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
// 1.1 初始化 - 做的事就是添加一个初始化器 ChannelInitializer
init(channel);
} catch (Throwable t) {
// 处理异常...
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}

// 1.2 注册 - 做的事就是将原生 channel 注册到 selector 上
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
// 处理异常...
}
return regFuture;
}

关键代码 io.netty.bootstrap.ServerBootstrap#init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 这里 channel 实际上是 NioServerSocketChannel
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}

final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}

// 为 NioServerSocketChannel 添加初始化器
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

// 初始化器的职责是将 ServerBootstrapAcceptor 加入至 NioServerSocketChannel
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 一些检查,略...

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 首次执行 execute 方法时,会启动 nio 线程,之后注册等操作在 nio 线程上执行
// 因为只有一个 NioServerSocketChannel 因此,也只会有一个 boss nio 线程
// 这行代码完成的事实是 main -> nio boss 线程的切换
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 日志记录...
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 1.2.1 原生的 nio channel 绑定到 selector 上,注意此时没有注册 selector 关注事件,附件为 NioServerSocketChannel
doRegister();
neverRegistered = false;
registered = true;

// 1.2.2 执行 NioServerSocketChannel 初始化器的 initChannel
pipeline.invokeHandlerAddedIfNeeded();

// 回调 3.2 io.netty.bootstrap.AbstractBootstrap#doBind0
safeSetSuccess(promise);
pipeline.fireChannelRegistered();

// 对应 server socket channel 还未绑定,isActive 为 false
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

关键代码 io.netty.channel.ChannelInitializer#initChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 1.2.2.1 执行初始化
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
// 1.2.2.2 移除初始化器
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
return true;
}
return false;
}

关键代码 io.netty.bootstrap.AbstractBootstrap#doBind0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 3.1 或 3.2 执行 doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

关键代码 io.netty.channel.AbstractChannel.AbstractUnsafe#bind

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();

if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}

if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// 记录日志...
}

boolean wasActive = isActive();
try {
// 3.3 执行端口绑定
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}

if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 3.4 触发 active 事件
pipeline.fireChannelActive();
}
});
}

safeSetSuccess(promise);
}

3.3 关键代码 io.netty.channel.socket.nio.NioServerSocketChannel#doBind

1
2
3
4
5
6
7
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

3.4 关键代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

1
2
3
4
5
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 触发 read (NioServerSocketChannel 上的 read 不是读取数据,只是为了触发 channel 的事件注册)
readIfIsAutoRead();
}

关键代码 io.netty.channel.nio.AbstractNioChannel#doBeginRead

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
// readInterestOp 取值是 16,在 NioServerSocketChannel 创建时初始化好,代表关注 accept 事件
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

2.2 NioEventLoop 剖析

NioEventLoop 线程不仅要处理 IO 事件,还要处理 Task(包括普通任务和定时任务),

提交任务代码 io.netty.util.concurrent.SingleThreadEventExecutor#execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

boolean inEventLoop = inEventLoop();
// 添加任务,其中队列使用了 jctools 提供的 mpsc 无锁队列
addTask(task);
if (!inEventLoop) {
// inEventLoop 如果为 false 表示由其它线程来调用 execute,即首次调用,这时需要向 eventLoop 提交首个任务,启动死循环,会执行到下面的 doStartThread
startThread();
if (isShutdown()) {
// 如果已经 shutdown,做拒绝逻辑,代码略...
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
// 如果线程由于 IO select 阻塞了,添加的任务的线程需要负责唤醒 NioEventLoop 线程
wakeup(inEventLoop);
}
}

唤醒 select 阻塞线程io.netty.channel.nio.NioEventLoop#wakeup

1
2
3
4
5
6
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}

启动 EventLoop 主循环 io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// 将线程池的当前线程保存在成员变量中,以便后续使用
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
// 调用外部类 SingleThreadEventExecutor 的 run 方法,进入死循环,run 方法见下
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 清理工作,代码略...
}
}
});
}

io.netty.channel.nio.NioEventLoop#run 主要任务是执行死循环,不断看有没有新任务,有没有 IO 事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
protected void run() {
for (;;) {
try {
try {
// calculateStrategy 的逻辑如下:
// 有任务,会执行一次 selectNow,清除上一次的 wakeup 结果,无论有没有 IO 事件,都会跳过 switch
// 没有任务,会匹配 SelectStrategy.SELECT,看是否应当阻塞
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:

case SelectStrategy.SELECT:
// 因为 IO 线程和提交任务线程都有可能执行 wakeup,而 wakeup 属于比较昂贵的操作,因此使用了一个原子布尔对象 wakenUp,它取值为 true 时,表示该由当前线程唤醒
// 进行 select 阻塞,并设置唤醒状态为 false
boolean oldWakenUp = wakenUp.getAndSet(false);

// 如果在这个位置,非 EventLoop 线程抢先将 wakenUp 置为 true,并 wakeup
// 下面的 select 方法不会阻塞
// 等 runAllTasks 处理完成后,到再循环进来这个阶段新增的任务会不会及时执行呢?
// 因为 oldWakenUp 为 true,因此下面的 select 方法就会阻塞,直到超时
// 才能执行,让 select 方法无谓阻塞
select(oldWakenUp);

if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}

cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio 默认是 50
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// ioRatio 为 100 时,总是运行完所有非 IO 任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 记录 io 事件处理耗时
final long ioTime = System.nanoTime() - ioStartTime;
// 运行非 IO 任务,一旦超时会退出 runAllTasks
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

⚠️ 注意

这里有个费解的地方就是 wakeup,它既可以由提交任务的线程来调用(比较好理解),也可以由 EventLoop 线程来调用(比较费解),这里要知道 wakeup 方法的效果:

  • 由非 EventLoop 线程调用,会唤醒当前在执行 select 阻塞的 EventLoop 线程
  • 由 EventLoop 自己调用,会本次的 wakeup 会取消下一次的 select 操作

参考下图

io.netty.channel.nio.NioEventLoop#select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 计算等待时间
// * 没有 scheduledTask,超时时间为 1s
// * 有 scheduledTask,超时时间为 `下一个定时任务执行时间 - 当前时间`
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 如果超时,退出循环
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}

// 如果期间又有 task 退出循环,如果没这个判断,那么任务就会等到下次 select 超时时才能被执行
// wakenUp.compareAndSet(false, true) 是让非 NioEventLoop 不必再执行 wakeup
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}

// select 有限时阻塞
// 注意 nio 有 bug,当 bug 出现时,select 方法即使没有时间发生,也不会阻塞住,导致不断空轮询,cpu 占用 100%
int selectedKeys = selector.select(timeoutMillis);
// 计数加 1
selectCnt ++;

// 醒来后,如果有 IO 事件、或是由非 EventLoop 线程唤醒,或者有任务,退出循环
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
if (Thread.interrupted()) {
// 线程被打断,退出循环
// 记录日志
selectCnt = 1;
break;
}

long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// 如果超时,计数重置为 1,下次循环就会 break
selectCnt = 1;
}
// 计数超过阈值,由 io.netty.selectorAutoRebuildThreshold 指定,默认 512
// 这是为了解决 nio 空轮询 bug
else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 重建 selector
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}

currentTimeNanos = time;
}

if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
// 记录日志
}
} catch (CancelledKeyException e) {
// 记录日志
}
}

处理 keys io.netty.channel.nio.NioEventLoop#processSelectedKeys

1
2
3
4
5
6
7
8
9
private void processSelectedKeys() {
if (selectedKeys != null) {
// 通过反射将 Selector 实现类中的就绪事件集合替换为 SelectedSelectionKeySet
// SelectedSelectionKeySet 底层为数组实现,可以提高遍历性能(原本为 HashSet)
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

io.netty.channel.nio.NioEventLoop#processSelectedKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 当 key 取消或关闭时会导致这个 key 无效
if (!k.isValid()) {
// 无效时处理...
return;
}

try {
int readyOps = k.readyOps();
// 连接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

// 可写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}

// 可读或可接入事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 如果是可接入 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
// 如果是可读 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

2.3 accept 剖析

nio 中如下代码,在 netty 中的流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//1 阻塞直到事件发生
selector.select();

Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
//2 拿到一个事件
SelectionKey key = iter.next();

//3 如果是 accept 事件
if (key.isAcceptable()) {

//4 执行 accept
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);

//5 关注 read 事件
channel.register(selector, SelectionKey.OP_READ);
}
// ...
}

先来看可接入事件处理(accept)

io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);

boolean closed = false;
Throwable exception = null;
try {
try {
do {
// doReadMessages 中执行了 accept 并创建 NioSocketChannel 作为消息放入 readBuf
// readBuf 是一个 ArrayList 用来缓存消息
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// localRead 为 1,就一条消息,即接收一个客户端连接
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}

int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理
// io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();

if (exception != null) {
closed = closeOnReadError(exception);

pipeline.fireExceptionCaught(exception);
}

if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}

关键代码 io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这时的 msg 是 NioSocketChannel
final Channel child = (Channel) msg;

// NioSocketChannel 添加 childHandler 即初始化器
child.pipeline().addLast(childHandler);

// 设置选项
setChannelOptions(child, childOptions, logger);

for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}

try {
// 注册 NioSocketChannel 到 nio worker 线程,接下来的处理也移交至 nio worker 线程
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

又回到了熟悉的 io.netty.channel.AbstractChannel.AbstractUnsafe#register 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 一些检查,略...

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 这行代码完成的事实是 nio boss -> nio worker 线程的切换
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 日志记录...
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;

// 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
pipeline.invokeHandlerAddedIfNeeded();
// 执行后就是 head -> logging handler -> my handler -> tail

safeSetSuccess(promise);
pipeline.fireChannelRegistered();

if (isActive()) {
if (firstRegistration) {
// 触发 pipeline 上 active 事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

回到了熟悉的代码 io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

1
2
3
4
5
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 触发 read (NioSocketChannel 这里 read,只是为了触发 channel 的事件注册,还未涉及数据读取)
readIfIsAutoRead();
}

io.netty.channel.nio.AbstractNioChannel#doBeginRead

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;
// 这时候 interestOps 是 0
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 关注 read 事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}

2.4 read 剖析

再来看可读事件 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read,注意发送的数据未必能够一次读完,因此会触发多次 nio read 事件,一次事件内会触发多次 pipeline read,一次事件会触发一次 pipeline read complete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// io.netty.allocator.type 决定 allocator 的实现
final ByteBufAllocator allocator = config.getAllocator();
// 用来分配 byteBuf,确定单次读取大小
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);

ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
// 读取
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}

allocHandle.incMessagesRead(1);
readPending = false;
// 触发 read 事件,让 pipeline 上的 handler 处理,这时是处理 NioSocketChannel 上的 handler
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
}
// 是否要继续循环
while (allocHandle.continueReading());

allocHandle.readComplete();
// 触发 read complete 事件
pipeline.fireChannelReadComplete();

if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}

io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(io.netty.util.UncheckedBooleanSupplier)

1
2
3
4
5
6
7
8
9
10
11
12
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return
// 一般为 true
config.isAutoRead() &&
// respectMaybeMoreData 默认为 true
// maybeMoreDataSupplier 的逻辑是如果预期读取字节与实际读取字节相等,返回 true
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
// 小于最大次数,maxMessagePerRead 默认 16
totalMessages < maxMessagePerRead &&
// 实际读到了数据
totalBytesRead > 0;
}