# 添加依赖
1 2 3 4 5 6
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.72.Final</version> </dependency>
|
# 创建服务
{.line_number}1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import java.net.InetSocketAddress; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component;
@Slf4j @Component public class RouterServer2 {
private final EventLoopGroup boss = new NioEventLoopGroup();
private final EventLoopGroup work = new NioEventLoopGroup();
@Resource private ChannelHandler childHandler;
@PostConstruct public void init() throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, work) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(7788)) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<Channel>() { ④ @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline() .addLast(new RouterMessageEncode()) .addLast(new LineBasedFrameDecoder(1024)) .addLast(new RouterMessageDecode()) .addLast(new ServiceMessageReceiveHandler()); } }); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { log.info("start router Server success on port 7788"); } }
@PreDestroy public void destory() throws InterruptedException { boss.shutdownGracefully().sync(); work.shutdownGracefully().sync(); log.info("关闭Netty"); } }
|
🔥 ⓪: RouterMessageEncode
: 这个是我们自己实现的消息编码器。
🔥①: 使用 Netty
提供的包处理工具,处理粘包问题已经用来拆包。
🔥②: 自定义的解码器。
🔥③: 自定义的消息处理器。 通用用来编写业务逻辑。
🔥④: 这里配置的是在 Netty
启动时,需要添加的 Channel
.
# 自定义编码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import java.nio.CharBuffer; import java.nio.charset.Charset; import java.util.List; import lombok.extern.slf4j.Slf4j;
@Slf4j public class RouterMessageEncode extends MessageToMessageEncoder<String> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, String message, List<Object> list) throws Exception { list.add(ByteBufUtil.encodeString(channelHandlerContext.alloc(), CharBuffer.wrap(message), Charset.defaultCharset())); } }
|
# 自定义解码器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import java.nio.charset.Charset; import java.util.List; import lombok.extern.slf4j.Slf4j;
@Slf4j public class RouterMessageDecode extends MessageToMessageDecoder<ByteBuf> {
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> list) throws Exception { String message = msg.toString(Charset.defaultCharset()); list.add(message); } }
|
# 消息处理逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j;
@Slf4j public class ServiceMessageReceiveHandler extends SimpleChannelInboundHandler<String> {
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("收到消息:" + msg); }
@Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); log.info("channel registed ..."); }
@Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { super.channelUnregistered(ctx); log.info("channel unregistered ...."); }
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; System.out.println("event:" + event.state()); } }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } }
|
# 测试
编写一个客户端进行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringEncoder;
public class RouteClient {
public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()) .addLast(new DelimiterBasedFrameDecoder( Integer.MAX_VALUE, Delimiters.lineDelimiter()[0])); } }); ChannelFuture future = bootstrap.connect("localhost", 7788).sync(); System.out.println(future.isSuccess()); for (int i = 0; i < 10; i++) { future.channel().writeAndFlush("test netty message\r\n").sync(); ⑤ } System.out.println("发送成功。。。。。"); } }
|
🔥⑤ 注意这里,需要加上 \r\n
, 否则你永远收不到消息。
# 最后
期望与你一起遇见更好的自己
扫码或搜索:方家小白
发送 290992
即可立即永久解锁本站全部文章