Netty初探

Netty 是一个基于 Java NIO 的异步、事件驱动的网络应用框架,旨在简化 TCP/UDP 等协议服务器和客户端的开发。它封装了底层的 NIO 细节,提供了易用的 API,广泛应用于高性能网络通信场景,如 Dubbo、RocketMQ、Elasticsearch 等中间件。

核心组件

  1. Channel

表示一个网络连接的抽象,如 SocketChannel。它是数据读写的通道,支持异步非阻塞操作。

  1. EventLoop & EventLoopGroup

EventLoop 是处理 I/O 操作的核心,绑定到一个线程,负责处理 Channel 的所有事件。EventLoopGroup 是 EventLoop 的集合,用于管理多个 EventLoop,实现多线程处理。developer.aliyun.com+2processon.com+2learn.lianglianglee.com+2developer.aliyun.com

  1. ChannelHandler & ChannelPipeline

ChannelHandler 是处理入站和出站数据的处理器,ChannelPipeline 是 ChannelHandler 的链式结构,定义了数据处理的流程。

  1. Bootstrap & ServerBootstrap

用于初始化客户端和服务器的辅助类,配置 Channel、EventLoopGroup、Handler 等。

  1. ByteBuf

Netty 自定义的字节缓冲区,替代了 JDK 的 ByteBuffer,提供更高效的内存管理和读写操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

// 创建一个容量为 256 字节的缓冲区
ByteBuf buffer = Unpooled.buffer(256);

// 使用已存在的字节数组创建 ByteBuf(共享数据)
byte[] data = new byte[]{1, 2, 3};
ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(data);

// 复制字节数组创建 ByteBuf(数据独立)
ByteBuf copiedBuffer = Unpooled.copiedBuffer(data);

Echo服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class EchoServer {
private final int port;

public EchoServer(int port) {
this.port = port;
}

public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new EchoServerHandler());
}
});

ChannelFuture f = b.bind(port).sync();
System.out.println("Echo server started on port " + port);
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
new EchoServer(8080).start();
}
}

class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // 将接收到的消息写回客户端
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush(); // 刷新所有待处理的消息到远程节点
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close(); // 关闭连接
}
}

Http服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;

public class HttpHelloWorldServer {
private final int port;

public HttpHelloWorldServer(int port) {
this.port = port;
}

public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(65536));
ch.pipeline().addLast(new SimpleChannelInboundHandler<FullHttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
ctx.alloc().buffer().writeBytes("Hello, World!".getBytes())
);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
ctx.writeAndFlush(response);
}
});
}
});

ChannelFuture f = b.bind(port).sync();
System.out.println("HTTP server started on port " + port);
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
new HttpHelloWorldServer(8080).start();
}
}

WebSocket服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.stream.ChunkedWriteHandler;

public class WebSocketServer {
private final int port;

public WebSocketServer(int port) {
this.port = port;
}

public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(65536));
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
ch.pipeline().addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
System.out.println("Received: " + msg.text());
ctx.channel().writeAndFlush(new TextWebSocketFrame("Echo: " + msg.text()));
}
});
}
});

ChannelFuture f = b.bind(port).sync();
System.out.println("WebSocket server started on port " + port);
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
new WebSocketServer(8080).start();
}
}

-------------本文结束感谢您的阅读-------------
感谢阅读.

欢迎关注我的其它发布渠道