# 添加依赖

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.72.Final</version>
</dependency>

# 创建服务

{.line_number}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import java.net.InetSocketAddress;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* Netty 服务端
*
* @author fangjiaxiaobai
* @date 2022-01-07 23:37
*/
@Slf4j
@Component
public class RouterServer2 {

/**
* boss 线程组用于处理连接工作
*/
private final EventLoopGroup boss = new NioEventLoopGroup();

/**
* work 线程组用于数据处理
*/
private final EventLoopGroup work = new NioEventLoopGroup();

@Resource
private ChannelHandler childHandler;

/**
* SpringBoot 启动的时候 调用
*
* @throws InterruptedException 中断异常
*/
@PostConstruct
public void init() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work)
// 指定Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口设置套接字地址
.localAddress(new InetSocketAddress(7788))
//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
.option(ChannelOption.SO_BACKLOG, 1024)
//设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<Channel>() { ④
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//添加编码器 ⓪
.addLast(new RouterMessageEncode())
//添加Netty 自带的 换行解码器(用来解决 沾包,拆包) ①
.addLast(new LineBasedFrameDecoder(1024))
//添加自定义的 解码器 ②
.addLast(new RouterMessageDecode())
//添加 接收消息的 处理器 ③
.addLast(new ServiceMessageReceiveHandler());
}
});
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
log.info("start router Server success on port 7788");
}
}

/**
* SpringBoot 销毁的时候 调用
*
* @throws InterruptedException 中断异常
*/
@PreDestroy
public void destory() throws InterruptedException {
boss.shutdownGracefully().sync();
work.shutdownGracefully().sync();
log.info("关闭Netty");
}
}

🔥 ⓪: RouterMessageEncode : 这个是我们自己实现的消息编码器。
🔥①: 使用 Netty 提供的包处理工具,处理粘包问题已经用来拆包。
🔥②: 自定义的解码器。
🔥③: 自定义的消息处理器。 通用用来编写业务逻辑。
🔥④: 这里配置的是在 Netty 启动时,需要添加的 Channel .

# 自定义编码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

/**
* 编码器
*
* @author fangjiaxiaobai
* @date 2022-01-07 23:50
*/
@Slf4j
public class RouterMessageEncode extends MessageToMessageEncoder<String> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String message,
List<Object> list) throws Exception {
list.add(ByteBufUtil.encodeString(channelHandlerContext.alloc(), CharBuffer.wrap(message), Charset.defaultCharset()));
}
}

# 自定义解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.nio.charset.Charset;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

/**
* 解码器
*
* @author fangjiaxiaobai
* @date 2022-01-07 23:54
*/
@Slf4j
public class RouterMessageDecode extends MessageToMessageDecoder<ByteBuf> {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> list) throws Exception {
// todo 解码
String message = msg.toString(Charset.defaultCharset());
list.add(message);
}
}

# 消息处理逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;

/**
* 消息处理器
*
* @author fangjiaxiaobai
* @date 2022-01-07 23:59
*/
@Slf4j
public class ServiceMessageReceiveHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("收到消息:" + msg);
}

/**
* 注册一个 channel
*
* @param ctx ChannelHandlerContext
* @throws Exception 异常
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
log.info("channel registed ...");
}

/**
* channel 退出
*
* @param ctx
* @throws Exception
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
log.info("channel unregistered ....");
}

/**
* 接收到的事件
*
* @param ctx ChannelHandlerContext
* @param evt Object
* @throws Exception 异常
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
System.out.println("event:" + event.state());
}
}

/**
* 通道异常 应该在此关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}

# 测试

编写一个客户端进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringEncoder;

/**
* @author fangjiaxiaobai
* @date 2022-01-08 00:40
*/
public class RouteClient {

public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group);
// 第2步 绑定客户端通道
bootstrap.channel(NioSocketChannel.class);
//第3步 给NIoSocketChannel初始化handler, 处理读写事件
//通道是NioSocketChannel
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//字符串编码器,一定要加在SimpleClientHandler 的上面
ch.pipeline().addLast(new StringEncoder())
.addLast(new DelimiterBasedFrameDecoder(
Integer.MAX_VALUE, Delimiters.lineDelimiter()[0]));
}
});
//连接服务器
ChannelFuture future = bootstrap.connect("localhost", 7788).sync();
System.out.println(future.isSuccess());
for (int i = 0; i < 10; i++) {
future.channel().writeAndFlush("test netty message\r\n").sync(); ⑤
}
System.out.println("发送成功。。。。。");
}
}

🔥⑤ 注意这里,需要加上 \r\n , 否则你永远收不到消息。

# 最后

期望与你一起遇见更好的自己

期望与你一起遇见更好的自己

更新于 阅读次数

请我喝[咖啡]~( ̄▽ ̄)~*

方小白 微信支付

微信支付

方小白 支付宝

支付宝

方小白 numberpay

numberpay