Netty学习笔记
Netty是一个基于NIO的客户端-服务器框架,用于支持快速、容易的开发可扩展(Scalable)的网络应用。Netty将网络开发的复杂性隔离出去——将网络处理代码与业务逻辑代码解耦,提供了便于使用的API。
一般来说,网络应用都具有可扩展性问题(Scalability),Netty能解决可扩展性问题的根本是它的异步特性(Asynchronous nature)。
在计算机科学里,引入额外的抽象层是屏蔽复杂性的通用手段。Netty引入了这样的一个抽象层以简化TCP、UDP编程,但是Netty仍然允许通过此抽象层访问低级的API。下面是Netty的特性表格:
开发区域 | Netty特性 |
设计 |
|
易用性 |
|
性能 |
|
健壮性 |
|
安全性 |
|
服务器端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
package cc.gmem.study.network.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cc.gmem.study.network.basic.Helper; public class EchoServer { private static final Logger LOGGER = LoggerFactory.getLogger( EchoServer.class ); private final int port; public EchoServer( int port ) { this.port = port; } public void start() throws Exception { //事件循环组将网络编程的线程处理部分很好的封装起来 EventLoopGroup group = new NioEventLoopGroup(); try { //要启动服务端,必须实例化下面这个实例 ServerBootstrap b = new ServerBootstrap(); b //指定使用的事件循环组,该组用于接受连接、接收/发送数据,等等 //可以指定两个事件循环组,分别用于接受连接、读写数据 .group( group ) //使用非阻塞的套接字传输,必须指定此服务器通道类型,以监听并处理客户端连接 .channel( NioServerSocketChannel.class ) //绑定的本地监听地址 .localAddress( new InetSocketAddress( port ) ) //当连接被接受后,会创建NioServerSocketChannel的子通道SocketChannel //childHandler方法用于指定子通道的处理器,通常为ChannelInitializer .childHandler( new ChannelInitializer() { public void initChannel( SocketChannel ch ) throws Exception { //管道(Pipeline)持有某个通道的全部处理器 ChannelPipeline pipeline = ch.pipeline(); //添加一个处理器 pipeline.addLast( new EchoServerHandler() ); } } ); //执行绑定,等待直到成功 ChannelFuture f = b.bind().sync(); LOGGER.debug( " started and listen on {}", f.channel().localAddress() ); //等待服务器通道被关闭 f.channel().closeFuture().sync(); } finally { //关闭事件循环,释放相关资源(包括创建的线程) group.shutdownGracefully().sync(); } } public static void main( String[] args ) throws Exception { new EchoServer( Helper.DEFAULT_PORT ).start(); } //该注解表示通道处理器类将被不同通道共享 @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { /** * 当从通道读取到数据后,会执行该回调 * 注意:数据可能碎片化,分若干次读取,这种情况下,该回调会被执行多次 */ public void channelRead( ChannelHandlerContext ctx, Object msg ) { ByteBuf buf = (ByteBuf) msg;//注意这里不一定能收到完整的消息 LOGGER.debug( "Server received: {}", buf.toString( CharsetUtil.UTF_8 ) ); //将接收到的数据写回去,注意这里还没有将数据刷空以发送到对端(peer) //当前方法不使用channelRead0的原因:write可能在channelRead返回前尚未完成(因为异步) //如果使用channelRead0,那么msg对应的ByteBuf将被自动释放(release) ctx.write( msg ); } /** * 当读取数据完毕(没有更多数据可读)后,会执行该回调 */ public void channelReadComplete( ChannelHandlerContext ctx ) { //刷空所有数据,并在执行完毕后,关闭通道 ctx.writeAndFlush( Unpooled.EMPTY_BUFFER ).addListener( ChannelFutureListener.CLOSE ); } /** * 当发生任何异常时,执行该回调 * 至少应当有一个通道处理器覆盖此方法,以实现必要的异常处理 */ public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) { //关闭通道 ctx.close(); } } } |
客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
package cc.gmem.study.network.netty; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cc.gmem.study.network.basic.Helper; public class EchoClient { private static final Logger LOGGER = LoggerFactory.getLogger( EchoClient.class ); private final String host; private final int port; public EchoClient( String host, int port ) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { //要启动客户端,必须实例化下面这个实例 Bootstrap b = new Bootstrap(); b //指定事件循环组 .group( group ) //指定非阻塞的套接字通道类 .channel( NioSocketChannel.class ) //指定需要连接的服务器 .remoteAddress( new InetSocketAddress( host, port ) ) //指定通道的处理器,一旦连接成功就会调用此处理器 .handler( new ChannelInitializer() { public void initChannel( SocketChannel ch ) throws Exception { //在通道的尾部添加一个处理器 ch.pipeline().addLast( new EchoClientHandler() ); } } ); //连接、等待直到连接成功 ChannelFuture f = b.connect().sync(); //等待直到客户端通道被关闭 f.channel().closeFuture().sync(); } finally { //关闭事件循环,释放相关资源(包括创建的线程) group.shutdownGracefully().sync(); } } public static void main( String[] args ) throws Exception { new EchoClient( Helper.DEFAULT_HOST, Helper.DEFAULT_PORT ).start(); } @Sharable public class EchoClientHandler extends SimpleChannelInboundHandler { /** * 一旦与服务器的连接建立,即调用此回调 */ public void channelActive( ChannelHandlerContext ctx ) { //发送一个简单的消息给服务器 //Unpooled是一个工具类,可以分配新的缓冲,或者包装已经存在的字节数组、字节缓冲、字符串 //copiedBuffer方法可以根据指定的字符串和编码方式,创建一个大端(big-endian)的字节缓冲 //这里的几个汉字和标点共计21字节 ByteBuf msg = Unpooled.copiedBuffer( "服务器,你好!", CharsetUtil.UTF_8 ); //写入数据并刷空,如果不刷空,数据可能存在于本地缓冲,不发送给服务器 ctx.writeAndFlush( msg ); } /** * 从服务器接收到数据后,调用此回调。channelRead0会自动的释放(减少引用计数,如果为0则解构并回收)ByteBuf对象 * 需要注意的是:接收到的字节可能是碎片化的(channelRead0),虽然上面发送了21字节给服务器 * 但是可能分多次读取,例如:第一次读取10字节,第二次读取11字节。因此该回调可能被调用多次 * 唯一能保证的是:在使用TCP或其它面向流的协议的情况下,数据被读取的顺序有保证 */ public void channelRead0( ChannelHandlerContext ctx, ByteBuf in ) { int count = in.readableBytes();//可以获取本次可读的字节数 ByteBuf buf = in.readBytes( count ); //读取为字节缓冲 Object msg = buf.toString( CharsetUtil.UTF_8 );//转换为字符串形式 LOGGER.debug( "Client received: {}", msg ); } public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) { ctx.close(); } } } |
一个Netty应用从Bootstrap类(包括ServerBootstrap、Bootstrap)开始,Bootstrap是Netty用于简化Netty配置和启动的一个类。
为了支持各种协议、支持通过不同的方式来处理数据,Netty引入处理器(Handler)的概念,处理器可以处理一种或者多种Netty事件。这里的“事件”是一个泛指的概念,因为某些处理器可以执行对象——字节的转换,与一般的I/O事件并不是一个概念。
一种最常编写的、包含了大部分业务逻辑的处理器是ChannelInboundHandler,可以用来接收消息并执行相应的处理,例如可以在此处理器中编写回应消息并write/flush。
当Netty客户端连接到服务器时、服务器监听到客户端连接时,需要知道如何处理消息的收发,即,需要知道使用什么处理器。Netty提供的ChannelInitializer专门用于注册前述的处理器,通过该类,可以将各种处理器注册到通道管道(ChannelPipeline)中。ChannelInitializer本身也是一种处理器,它会在添加了其它处理器后,将自己从管道中移除。
所有Netty应用程序都是基于管道的,与管道相关的概念是事件循环(EventLoop)、事件循环组(EventLoopGroup),这三者与事件、事件处理紧密相关。
一个事件循环的目的是处理多个通道的I/O操作。事件循环组则包含多个事件循环,可以对外提供。一个事件循环在其生命周期内绑定到唯一的单个线程。需要注意的是,EventLoopGroup是EventLoop的父接口。
通道(Channel)代表了一个Socket连接,或者类似的支持I/O操作的概念。
Netty中的所有I/O操作都是异步的,因此:
- 当连接到服务器时,操作是异步完成的
- 当读写消息时,操作是异步完成的
操作不是在当前线程中立即执行,而总是在稍后被调度,到底何时操作被执行无法预先知晓,唯一能确定的是,操作被执行的顺序得到保证,先发起的操作肯定会先被执行。由于这个特征,不能从调用返回值中判断操作是否成功或结束,只能注册某种类型的监听器进行判断,Netty通过将返回值规定为ChannelFuture、Future来支持监听器(ChannnelFutureListener)的注册,监听器函数将在操作成功或者失败时得到通知。
当一个通道注册到Netty后,会被绑定到单个事件循环,此事件循环对该通道的整个生命周期负责,该通道的所有I/O操作都由事件循环对应的线程执行。
多个通道可以共享单个事件循环,因此任何时候,代码都不应该使事件循环(的底层线程)被阻塞(例如在通道处理器代码中执行数据库读写),这会严重影响Netty的性能。
Bootstrapping分为两个类型:
- Bootstrap:用于客户端,或者任何数据报(UDP)通道(DatagramChannel)。具有一个事件循环组
- ServerBootstrap:用于TCP服务器端,具有两个事件循环组(虽然可以指向一个实例),分别服务于用于监听的服务器通道本身、与客户端建立起来的通道。这种分开的方式能够在极高并发的场景下接受新的连接
Netty允许使用单个事件循环组,同时负责接受连接、处理I/O,在很多应用场景下,这样做不会有什么问题。
通道处理器具有统一的接口:ChannelHandler,主要可以分为两个子类型:ChannelInboundHandler、ChannelOutboundHandler。从数据流的角度更容易理解这两类处理器,所谓入站(Inbound)是指数据是从对端(Peer)流向本地用户程序,出站(Outbound)则反之。
通道处理器应当在Bootstrap阶段即注册,其在管道(ChannelPipeline)中的顺序决定了其操控数据的顺序。管道的本质就是一系列排好序的通道处理器。
管道中的处理器,按照顺序依次被调用以处理数据(前提是处理器支持对目标数据的处理,例如ChannelInboundHandler能处理入站数据),并可将转换完的数据传递给下一个处理器,直到管道的结尾(Tail)。例如下图的场景:
入站消息处理流程:
- 任何通过入站事件(例如Read)进入管道的消息,会被管道中第一个入站处理器处理
- 第一个入站处理器可以处理此事件,或者什么都不做
- 第一个入站处理器将消息传递给下一个入站处理器
- 当管道中没有更多的入站处理器时,处理结束
出站消息处理流程
- 任何一个通过出站事件(例如Write)进入管道的消息,将从管道中最后一个出站处理器处理
- ……
- 当管道中没有更多出站处理器时,将出发实际的传输操作——例如通过套接字写入数据
当通道处理器被加入管道时,它就得到一个通道处理器上下文(ChannelHandlerContext)对象,除非使用类似UDP的数据报协议,可以安全的传递通道处理器上下文的引用。通道处理器上下文可以用来写入/发送消息。直接将消息写入通道与通过通道处理器上下文写入消息的区别是:直接写入到通道,会导致消息从管道的尾部开始被处理;通过通道处理器上下文写入,则从该通道处理器的下一个处理器开始。
Netty为通道处理器提供了一系列的适配器(Adapter),继承这些适配器类,只需要覆盖感兴趣的方法即可,其它的管道处理逻辑由父类提供。适配器类包括:ChannelHandlerAdapter、ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter、ChannelDuplexHandlerAdapter。除了适配器类以外,Netty还提供了若干编码/解码类,用于方便的对消息进行编解码。
通过Netty进行消息收发时,需要进行编解码操作,因为只有字节才能在网络上传输:
- 在接收消息时,需要将字节转换为Java对象,这是一个解码(Decode)过程。解码器都是ChannelInboundHandler的实现
- 在发送消息时,需要将Java对象转换为字节,这是一个编码(Encode)过程。编码器都是ChannelOutboundHandler的实现
解码器实现类都会覆盖channelRead方法,在其中对数据进行解码,并调用ChannelHandlerContext.fireChannelRead(decodedMessage),这导致管道中下一个处理器接手处理解码后的消息(decoded message)。编码器的执行过程类似。
大部分应用程序可能仅仅希望获得已经解码完毕的消息,并执行必要的领域逻辑,可以继承SimpleChannelInboundHandler<T>并覆盖其channelRead0(ChannelHandlerContext, T)方法,在其中实现领域逻辑。参数化类型T就是你期望处理的消息类型。从字节到T的转换是解码器需要完成的工作。
网络应用都需要应用某种传输机制在网络上传送(Transfer)字节数据,传输机制可以是阻塞的/非阻塞的,也可以有多种实现方式,例如TCP、UDP,Netty进行了很好的抽象,当需要切换一种传输机制时,已编写的代码不需要做太多的改动。
下图展现了通道(Channel)类的层次结构与依赖关系:
可以看到,一个Channel持有一个ChannelPipeline、一个ChannelConfig。
ChannelConfig包含了通道的全部配置项信息,并且可以按需随时修改。通常每种传输都有自己特殊的配置信息,因此ChannelConfig被实现为多种子类型。
ChannelPipeline在前面简单的介绍过,其包含了若干ChannelHandler组成的链条。ChannelPipeline实现了拦截过滤器模式(Intercepting Filter Pattern)。ChannelPipeline也支持按需即时修改,可以动态的添加/删除ChannelHandler以实现高度灵活的应用程序。
Channel本身也提供了很多方法,例如:
方法 | 说明 | ||
eventLoop() | 返回该通道关联的事件循环 | ||
alloc() | 返回此通道使用的缓冲分配器 | ||
attr() | 返回一个通道属性对象(Attribute),返回值绝不为null,但是对其调用get()可能返回null | ||
pipeline() | 返回该通道的管道 | ||
isActive() | 判断通道是否是激活的(即连接到对端) | ||
localAddress() | 获取通道本端地址 | ||
remoteAddress() | 获取通道对端地址 | ||
write() | 写入数据给对端,数据将通过ChannelPipeline被处理,例如:
|
需要注意的是,Channel类是线程安全的,这意味着可以通过多个线程不加保护的访问同一Channel实例,在应用程序中多处传递Channel的实例,在需要时立刻通过它向对端发送消息是允许的,并且发送的顺序得到保证。
Netty已经包含了若干内置的Channel实现,这些实现均支持全部的协议类型:
名称 | 包 | 说明 |
NIO | io.netty.channel.socket.nio | 基于java.nio.channels构建,支持Selector操作。适用于高并发场景 |
OIO | io.netty.channel.socket.oio | 基于java.net package构建,使用阻塞的流方式处理I/O。适用于低连接数、极低延迟场景 |
Local | io.netty.channel.local | 在VM内部使用,通过管道进行通信。该传输与NIO一样,是完全异步的。服务器/客户端必须同时使用该传输。 |
Embedded | io.netty.channel.embedded | 可以用于测试新的ChannelHandler实现。该通道不需要真实的网络支持 |
每当需要传输数据的时候,必然牵涉到缓冲。出于性能优化、易用性的考虑,Netty没有使用Java NIO自带的Buffer实现,Netty提供的ByteBuf相当于NIO的ByteBuffer。
Netty的缓冲API包含两个接口:ByteBuf、ByteBufHolder。
Netty使用引用计数(reference-counting)机制来确定何时缓冲及其申请的内存资源可以被安全的释放,该机制有利于提高运行速度、保持内存使用量在一个合理的级别。
Netty的缓冲API具有以下优点:
- 如果必要,可以定义自己的缓冲类型
- 通过内置的复合缓冲类型,实现透明的零(内存)拷贝
- 缓冲容量按需自动扩展,就像StringBuffer一样
- 不需要调用flip()来切换读写模式,因为读写指针被分开
- 支持链式方法调用
- 基于引用计数
- 支持池化(Pooling)
通过ByteBuf,可以方便高效的添加、读取字节。为了方便读写,ByteBuf引入两个索引(指针),分别用于指示读、写的位置,ByteBuf允许进行顺序读,然后再跳回去继续读,你只需要调整读索引即可。
当数据被写入ByteBuf后,writerIndex增加相应的字节数。当执行read操作时,readerIndex增加,当readerIndex增大到writerIndex时,ByteBuf变为不可读,此时在进行read会导致IndexOutOfBoundsException。调用ByteBuf的任何read*、write*方法均会导致索引增加,相反的set*、get*不会移动索引值。
ByteBuf具有可选的最大容量(默认Integer.MAX_VALUE),尝试超过此容量限制进行写入会触发异常。
使用Netty时,你可能遇到3种不同类型的ByteBuf:
类型 | 说明 |
堆缓冲 |
最常见的类型,直接在Java堆中存储的缓冲类型,背后使用数组实现(hasArray返回true),在不使用缓冲池的情况下,该类型分配、回收的速度较快。通过非堆缓冲来访问array()方法会导致UnsupportedOperationException |
直接缓冲 | 所谓直接,是指在堆外,也就是物理机器上直接分配内存。直接缓冲之所以优化,是因为避免了一次内存拷贝(JVM在发送数据到Socket时,会将堆缓冲拷贝到直接内存)。直接缓冲的缺点是,分配、回收的代价比堆缓冲大。Netty用缓冲池来规避这一缺陷 |
组合缓冲 |
可以组合多个ByteBuf的实例,并且提供一个高层的视图。例如现代应用的消息经常被分为Head、Body两部分,有时候只需要修改Head,相同的Body可以发给不同的客户端,这种情况下组合缓冲可以减少内存拷贝。 相比之下,使用JDK的ByteBuffer无法达到此效果,只能创建一个新的、大的缓冲,将被组合的缓冲逐个拷贝进来 使用Netty的组合缓冲来实现分散/聚集,不会遇到NIO分散/聚集的性能问题 |
操作 | 说明 |
随机访问 | 指定一个index,可以进行读、写。支持读写字节、字节数组、字符、布尔等类型。这类操作不会修改读写索引,包括一系列的get/set方法 |
顺序访问 |
与上面类似,但是会移动读写索引,包括一系列的read/write方法。数据类型长度如下: 1 Boolean, Byte, UnsignedByte |
丢弃字节 | 方法discardReadBytes()可以丢弃已经被读取过的字节。该方法会导致内存拷贝,因为需要向前移动尚未被读取的字节,以便腾出空间供写入 |
是否可读写 | isReadable()、isWritable() 用于判断是否可读写(至少1字节) |
可读写字节数 | readableBytes()、writableBytes()用来计算当前缓冲中可读写的字节数 |
获取容量 | capacity()获取当前容量;maxCapacity()获取最大容量 |
清空 | clear()会把readerIndex、writerIndex都设置为0,但并不修改缓冲中的内容。该方法的语义与JDK的ByteBuffer.clear()不同。这是一个很廉价的操作 |
搜索操作 |
indexOf()可以用来搜索某个字节的出现位置,更复杂的搜索可以使用ByteBufProcessor bytesBefore()可以用来轻松的判断某个字符的位置,例如NULL字符 |
标记与重置 |
readerIndex()、writerIndex()可以标记或返回当前的读写索引位置。resetReaderIndex()、resetWriterIndex()则可以重置 |
导出的ByteBuf |
以下方法可以获得某个ByteBuf的视图:duplicate()、slice()、slice(int, int)、readOnly()、order(ByteOrder)。导出的ByteBuf具有独立的读、写、标记索引。这些视图与底层ByteBuf共享数据结构 |
拷贝ByteBuf |
copy()可以拷贝以生成一个全新的ByteBuf |
数组 |
hasArray()判断底层是否为数组;array()返回底层数组 |
引用计数 |
与引用计数相关的方法来自接口:ReferenceCounted retain:将引用计数增加1或者更多数目 |
该工具类可以方便的分配ByteBuf对象ChannelHandlerContext、Channel的alloc()方法,可以获得该工具类的实例。
目前Netty提供了两个ByteBufAllocator实现:
- PooledByteBufAllocator:其中一个使用类似于jemalloc的算法,实现了ByteBuf的池,该实现最小化了分配/回收内存的开销以及内存碎片。Netty默认使用该实现
- UnpooledByteBufAllocator:每次都是创建全新的ByteBuf实例,不进行池化处理
通过ChannelConfig可以很容易的切换上述两种实现。
ByteBufAllocator提供以下方法:
方法 | 说明 |
buffer() | 根据实现,自动分配堆或者直接缓冲 |
heapBuffer() | 分配堆缓冲 |
directBuffer() | 分配直接缓冲 |
compositeBuffer() | 分配组合缓冲,类似方法:heapCompositeBuffer()、directCompositeBuffer() |
ioBuffer() | 返回一个用于从套接字读取数据的缓冲 |
该类提供了一些静态方法,用来方便的创建ByteBuf:
方法 | 说明 |
buffer() | 创建一个非池化的堆缓冲 |
directBuffer() | 创建一个非池化的直接缓冲 |
wrappedBuffer() | 返回一个ByteBuf,其包装了既有数据 |
copiedBuffer() | 返回一个ByteBuf,其拷贝了既有数据 |
包含了一些常用的功能。
通道处理器可以:
- 处理消息格式转换
- 获取异常通知并处理
- 当通道激活/失活时得到通知
- 当通道从事件循环注册/解除注册时得到通知
- 处理用户定义事件
管道维持了通道处理器的列表,其实现了一种高级形式的拦截过滤器,允许:
- 控制事件的处理方式
- 控制不同通道处理器的交互方式
对于每一个新创建的通道,一个全新的ChannelPipeline被创建,并且附到通道上,一旦此关联关系建立,在通道的生命周期内均不可以解除。
对于入站数据,将从管道的头部进入,依次由各ChannelHandler处理;对于出站数据,则由管道的尾部进入。管道会判断ChannelHandler的类型,它不会让ChannelInboundHandler去处理出站数据。
管道提供了以下方法:
方法 | 说明 |
addFirst() | 添加ChannelHandler到管道头部 |
addBefore() | 添加ChannelHandler到某个处理器前面 |
addAfter() | 添加ChannelHandler到某个处理器后面 |
addLast() | 添加ChannelHandler到管道尾部 |
remove() | 从管道中移除一个处理器 |
replace() | 替换管道中的一个处理器 |
get() | 获取管道中的某个ChannelHandler |
context() | 获取管道中的某个ChannelHandler的ChannelHandlerContext |
contains() | 判断管道中是否包含指定名称/类型的ChannelHandler |
names() | 返回所有ChannelHandler的名称的集合 |
iterator() | 返回所有ChannelHandler的迭代器 |
入站操作 | |
fireChannelRegistered() | 导致调用管道中下一个ChannelInboundHandler的channelRegistered被调用 |
fireChannelUnregistered() | 导致调用管道中下一个ChannelInboundHandler的channelUnregistered被调用 |
fireChannelActive() | 导致调用管道中下一个ChannelInboundHandler的channelActive被调用 |
fireChannelInactive() | 导致调用管道中下一个ChannelInboundHandler的channelInactive被调用 |
fireExceptionCaught() | 导致调用管道中下一个ChannelInboundHandler的exceptionCaught被调用 |
fireUserEventTriggered() | 导致调用管道中下一个ChannelInboundHandler的userEventTriggered被调用 |
fireChannelRead() | 导致调用管道中下一个ChannelInboundHandler的channelRead被调用 |
fireChannelReadComplete() | 导致调用管道中下一个ChannelInboundHandler的channelReadComplete被调用 |
出站操作 | |
bind() | 请求将Channel绑定到一个本地地址。导致调用下一个ChannelOutboundHandler的bind(ChannelHandlerContext, SocketAddress, ChannelPromise)方法 |
connect() | 请求将Channel连接到一个远程地址。导致调用下一个ChannelOutboundHandler的connect(ChannelHandlerContext, SocketAddress, ChannelPromise) |
disconnect() | 请求断开通道的连接,导致调用下一个ChannelOutboundHandler的disconnect(ChannelHandlerContext, ChannelPromise) |
close() | 请求关闭通道,导致调用下一个ChannelOutboundHandler的close(ChannelHandlerContext, ChannelPromise) |
deregister() | 请求从事件循环中解除当前通道的注册,导致调用下一个ChannelOutboundHandler的deregister(ChannelHandlerContext, ChannelPromise) |
flush() | 请求刷空提到所有未决的写操作,导致调用下一个ChannelOutboundHandler的flush(ChannelHandlerContext)被调用 |
write() | 请求写入指定的数据到通道中,导致调用下一个ChannelOutboundHandler的write(ChannelHandlerContext, Object msg, ChannelPromise) |
writeAndFlush() | 请求写入并刷空,导致调用下一个ChannelOutboundHandler的writeAndFlush()被调用 |
read() | 请求从通道中读取更多的数据,导致调用下一个ChannelOutboundHandler的read(ChannelHanlderContext) |
举例:
1 2 3 4 5 6 7 8 9 |
ChannelPipeline pipeline = ..; FirstHandler firstHandler = new FirstHandler(); pipeline.addLast("handler1", firstHandler); pipeline.addFirst("handler2", new SecondHandler()); pipeline.addLast("handler3", new ThirdHandler()); pipeline.remove("handler3"); pipeline.remove(firstHandler); pipeline.replace("handler2", "handler4", new FourthHandler()); |
本文在前面提到过,绝不要在I/O线程中阻塞,否则将影响使用当前事件循环的所有通道的性能。但是有些时候阻塞操作必然的,比如需要进行JDBC操作。为了解决这个矛盾,Netty为所有ChannelPipeline.add*方法提供了一个可选的参数EventExecutorGroup。该参数用于执行ChannelHandler定义的方法。
一旦某个ChannelHandler被添加到管道中,一个新的通道处理器上下文(ChannelHandlerContext)被创建并关联到ChannelHandler,并永远不会被替换。此上下文允许当前处理器与其它处理器进行交互。
ChannelHandlerContext包含很多与ChannelPipeline、Channel相同签名的方法,其区别是:通过Channel/ChannelPipeline调用这些方法,会穿过整个管道;而通过ChannelHandlerContext调用,只会从当前处理器出发穿越剩余的管道部分。下面是用法示例:
1 2 3 4 5 6 7 8 9 10 |
ChannelHandlerContext ctx = getChannelHandlerContext(); //发起一个从管道尾部贯穿管道的写(出站)事件 Channel channel = ctx.channel(); channel.write( Unpooled.copiedBuffer( "Hello", CharsetUtil.UTF_8 ) ); //另外一种写法 ChannelPipeline pipeline = ctx.pipeline(); pipeline.write( Unpooled.copiedBuffer( "Hello", CharsetUtil.UTF_8 ) ); //直接使用通道处理器上下文: ctx.write( Unpooled.copiedBuffer( "Hello", CharsetUtil.UTF_8 ) ); |
前两个写入与第三个写入的差异,可以参考下图理解:
在ChannelHandler外面使用ChannelHandlerContext也是可以的,后者是线程安全的。
需要注意的是,包含了 @Sharable 注解的ChannelHandler类的同一个实例会被添加到不同的管道(即被多个Channel共享)中。如果尝试将没有@Sharable注解的同一实例添加到不同的管道,会抛出异常。使用这种共享的处理器时最好保证其是无状态的。
Netty引入了一个简单的状态模型来描述通道的生命周期,ChannelInboundHandler的方法映射到这些状态上:
状态 | 描述 |
channelUnregistered | 通道已经建立,但尚未注册到事件循环 |
channelRegistered | 通道已经注册到事件循环 |
channelActive | 通道已经激活,意味着已经与对端连接成功,可以收发数据 |
channelInactive | 通道没有连接到对端 |
通常情况下,通道的生命周期是由上表的四个状态顺序组成。在某些高级应用场景下,可能临时解除注册以暂停事件处理,这时就会有多于一个的注册/解除注册状态出现。不论如何,激活/失活状态只会出现一次,分别代表了连接的打开和关闭,如果需要在关闭后继续与对端通信,必须重新创建通道。
本文前面已经提到过,通道处理器分为两类:入站、出站,它们的功能划分如下:
处理器 | 功能 |
入站(Inbound)处理器 | 处理所有入站数据(接收到的数据)、以及所有的通道状态变更 |
出站(Outbound)处理器 | 处理所有出站数据(准备发出的数据)、并且允许拦截各种操作 |
处理器共同的父接口ChannelHandler提供了以下方法(这些方法都将ChannelHandlerContext作为一个参数):
方法 | 说明 |
handlerAdded() | 当处理器被添加到管道时,该方法被调用 |
handlerRemoved() | 当处理器被从管道移除时,该方法被调用 |
exceptionCaught() | 当执行处理时,发生异常时调用 |
ChannelInboundHandler提供了一系列的方法,当通道的生命周期状态改变、或数据抵达时,会自动调用:
方法 | 说明 |
channelRegistered() | 一旦某个通道被注册到事件循环,即调用 |
channelUnregistered() | 一旦通道从事件循环解除注册,即调用 |
channelActive() | 一旦通道变为活动的,即连接到对端,或者绑定监听完毕后,即调用 |
channelInactive() | 一旦通道与对端的连接断开,即调用 |
channelReadComplete() | 一旦读操作执行完毕,即调用 |
channelRead() | 一旦入站缓冲包含可读数据时,即调用 |
userEventTriggered() | 用户自定义事件被触发时调用 |
ChannelInboundHandlerAdapter类提供了ChannelInboundHandler的缺省适配,这些缺省适配的行为都是:转到下一个处理器继续处理。
需要注意的是,你覆盖的channelRead()方法要对缓冲的释放负责,特别在使用ByteBuf缓冲池的情况下,忘记释放(release)资源会导致内存泄漏。下面的例子展示了如何丢弃入站消息并进行资源释放:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
@Sharable public class DiscardInboundHandler extends ChannelInboundHandlerAdapter { private boolean discard = true; @Override public void channelRead( ChannelHandlerContext ctx, Object msg ) { //只要不调用ctx.fireChannelRead,就相当于丢弃了该消息,后续的处理器不会对其进行处理 //对于丢弃的消息,应当释放其资源 if ( discard ) ReferenceCountUtil.release( msg ); else ctx.fireChannelRead( msg ); } } |
如果存在资源忘记释放的情况,Netty会以WARN级别的日志发出警告。手工进行资源的释放是比较琐碎的工作,继承SimpleChannelInboundHandler类并覆盖channelRead0则不必手工释放。如果使用SimpleChannelInboundHandler,处理完毕后消息就被自动释放,不能保留消息的引用供后续使用。SimpleChannelInboundHandler对channelRead方法覆盖如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { //如果msg的类型与SimpleChannelInboundHandler泛型参数的类型匹配,才进行处理 //前面的通道处理器应当已经将原始的ByteBuf转换为期望的消息类型 @SuppressWarnings("unchecked") I imsg = (I) msg; //调用channelRead0对消息进行处理 channelRead0(ctx, imsg); } else { //如果类型不匹配,则当前处理器不做任何操作,将其原封不动的传递给下一个处理器 release = false; ctx.fireChannelRead(msg); } } finally { //如果当前处理器配置了自动释放,并且对消息进行了处理,释放之 if (autoRelease && release) { ReferenceCountUtil.release(msg); } } } |
ChannelOutboundHandler提供了以下方法:
方法 | 说明 |
bind() | 一旦请求绑定到本地地址,即调用此方法 |
connect() | 一旦请求连接到对端,即调用此方法 |
disconnect() | 一旦请求对端断开连接,即调用此方法 |
close() | 一旦请求关闭通道,即调用此方法 |
deregister() | 一旦请求从事件循环解除当前通道的注册,即调用此方法 |
read() | 一旦请求从通道读取更多数据,即调用此方法 |
flush() | 一旦请求刷空数据发往对端,即调用此方法 |
write() | 一旦请求写入数据到通道,即调用此方法 |
几乎上表的全部方法要求一个ChannelPromise类型的参数,一旦请求停止继续在管道中前进时,应当调用ChannelPromise进行通知。
ChannelOutboundHandlerAdapter类提供了ChannelOutboundHandler的确省适配,它的实现都是调用ChannelHandlerContext的同名方法,将事件转给管道中下一个处理器处理。再次强调一下,下一个是指更靠近管道头部的、离当前处理器最近的出站处理器。下面的例子展示了如何丢弃出站消息并进行资源释放:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
@Sharable public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter { private boolean discard = true; @Override public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) { if ( discard ) { //只要不调用ctx.write,就相当于丢弃了消息,后续处理器不会对其进行处理,数据也不会发送到对端 //对于丢弃的消息,应当释放其资源 ReferenceCountUtil.release( msg ); //必须通知ChannelPromise已经完成对消息的处理,否则调用channel.write时给出的ChannelFutureListener //不会被调用 promise.setSuccess(); } else { ctx.write( msg ); } } } |
选项 | 说明 |
ALLOCATOR | 设置字节缓冲的分配器,设置该选项以修改缓冲区的分配方式 |
RCVBUF_ALLOCATOR | 设置用于接收缓冲的分配器 |
MESSAGE_SIZE_ESTIMATOR | 设置消息长度估算器 |
CONNECT_TIMEOUT_MILLIS | 连接超时时间 |
MAX_MESSAGES_PER_READ | 单此读取时间最多从底层套接字读取的字节数 |
WRITE_SPIN_COUNT | 向输出缓冲写入数据时,自旋(循环操作)的次数,如果再循环内操作完成则立即终止循环 |
WRITE_BUFFER_HIGH_WATER_MARK | 写缓冲高水位,超过后Netty在内部标记一个不可写,直到水位降低到底水位以下 |
WRITE_BUFFER_LOW_WATER_MARK | 写缓冲低水位,低于该水位后,Netty在内部标记一个可写 |
ALLOW_HALF_CLOSURE | 是否允许半关闭套接字 |
SO_BROADCAST | 允许底层套接字发送广播数据报 |
SO_KEEPALIVE | 启用TCP保活 |
SO_SNDBUF | 给操作系统一个提示值,用来指示出站数据缓冲区的大小 |
SO_RCVBUF | 操作系统接收数据时,使用的缓冲区大小 |
SO_REUSEADDR | 启用套接字地址重用 |
SO_LINGER | 影响关闭套接字的方式,设置一个徘徊时间,超过后立即丢弃发送队列中的数据并立即关闭 |
SO_BACKLOG | 设置入站连接请求排队的数量,超过此数量的连接被拒绝 |
SO_TIMEOUT | 阻塞的套接字操作的超时时间 |
IP_TOS | 设置服务类型(TOS)字段,可以指定最小延时、最大吞吐量、最高可靠性、最小成本 |
IP_MULTICAST_ADDR | 设置用于多播的网络接口的地址。最终调用DatagramSocket.setInterface() |
IP_MULTICAST_IF | 对于多网络接口(Multihomed)主机,用于设置组播包的出口网卡。最终调用DatagramSocket.setNetworkInterface() |
IP_MULTICAST_TTL | 组播数据包的IP报文的TTL字段 |
IP_MULTICAST_LOOP_DISABLED | 禁止本地环回的多播数据报 |
TCP_NODELAY | 禁止TCP延迟确认算法 |
当编写网络应用时,往往需要实现某种编解码工具。该工具用完成原始字节到某种自定义消息格式的双向转换。Codec一般由一个编码器、一个解码器组成,前者用于出站数据、后者用于入站数据。
编解码工具本身属于ChannelHandler。
这类解码器可以将字节转换为某种形式的消息,消息本身甚至也是字节格式的。抽象类ByteToMessageDecoder提供了以下方法:
方法 | 说明 |
decode() | 该方法子类必须覆盖。用于将包含所有接收到的字节的ByteBuf进行解码,并将结果放入List中 |
decodeLast() | 该方法仅仅会在通道变为inactive调用一次,如果需要特殊化处理,可以覆盖此方法 |
考虑如下场景:对端发送一系列的字节过来,其中包含若干Integer类型的整数,本端需要把这些整数分离出来,逐个的传递给后面的ChannelHandler处理,如下图:
示意代码如下:
1 2 3 4 5 6 7 8 9 10 11 |
public class ToIntegerDecoder extends ByteToMessageDecoder { @Override public void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out ) throws Exception { if ( in.readableBytes() >= 4 ) { out.add( in.readInt() ); } } } |
可以看到,上述decode方法只是对输入缓冲执行一次解码,尝试得到一个Integer,但如果输入缓冲中包含很多Integer,如何处理呢?ByteToMessageDecoder已经封装好循环调用deocde方法的逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { //如果输入缓冲可读,则循环处理 while (in.isReadable()) { int outSize = out.size(); //解码结果列表的长度 int oldInputLength = in.readableBytes(); //当前输入缓冲可读数 decode(ctx, in, out); //调用子类实现的解码方法,执行解码 if (ctx.isRemoved()) { break; } //如果解码前后,结果列表长度没有变化、并且输入长度也没有变化 //说明当前的输入缓冲尚不够解码下一个消息,因此需要退出循环 if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } //结果列表增加了,但是输入缓冲没有消耗,这是非法的,抛出异常 if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } //如果只解码一次,那么不作循环处理 if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } } |
解码过程执行完毕后,如何将结果列表依次发送给下一个ChannelHandler处理呢?这个逻辑也由ByteToMessageDecoder实现了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { //构建解码输出列表,Netty使用了私有的ArrayList子类,主要是出于性能方面的考虑 RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { //如果是第一次解码,将累积缓冲(cumulation)设置为当前输入缓冲 cumulation = data; } else { //否则,将上次没解码完的多余数据和本次接收到的数据累加起来,作为累积缓冲 //由于TCP的特征,数据可能呈碎片化到达,因此某一次channelRead过程中遇到无法解码的结尾部分是很常见的 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } //循环的进行解码 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { //即使有异常抛出,也会调用后面的处理器 //如果累积缓冲没有数据,将其回收 if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); cumulation = null; } int size = out.size(); //遍历解码结果,逐个交由下游处理器处理 for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } else { //如果输入消息不是ByteBuf,那么当前解码器不能处理,交由下一个ChannelHandler处理 ctx.fireChannelRead(msg); } } |
该类型是ByteToMessageDecoder的一种特殊形式,主要解决难以预先(除非读取到结尾)知晓输入缓冲中是否还有足够的数据供解码(下一个输出)的问题,继承该类型不需要进行预读判断:
1 2 3 4 5 6 7 8 9 10 11 |
public class ToIntegerDecoder extends ReplayingDecoder<Void> { @Override public void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out ) throws Exception { //前面不需要判断输入缓冲是否有足够4字节 //如果读取失败,一个特殊的Signal错误被抛出,ReplayingDecoder会自动处理并终止解码循环 out.add( in.readInt() ); } } |
以下是ReplayingDecoder的核心代码,可以看到它利用一个ReplayingDecoderBuffer来装饰输入缓冲,ReplayingDecoderBuffer会在执行读取方法时进行检查,检查失败时(不能读取)会抛出一个Signal,ReplayingDecoder捕获此信号,将输入缓冲的读索引回滚到检查点上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
//一个专用的ByteBuf子类,用于实现“回放”(重新读取) private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(); @Override protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { replayable.setCumulation(in); //包裹原始输入以支持回放 try { while (in.isReadable()) { int oldReaderIndex = checkpoint = in.readerIndex(); int outSize = out.size(); //泛型参数通常是一个枚举类型,如果不需要状态管理,则使用Void作为泛型参数 S oldState = state; int oldInputLength = in.readableBytes(); try { //解码时需要调用缓冲replayable的方法,该类的读取方法在无法读出请求的数据时会抛出Signal decode(ctx, replayable, out); if (ctx.isRemoved()) { break; } if (outSize == out.size()) { //输出结果未增加 if (oldInputLength == in.readableBytes() && oldState == state) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " + "data or change its state if it did not decode anything."); } else { //数据已经被消耗,或者导致了状态变迁,可能需要继续往下读 //例如,我们使用NUL字符分隔消息,如果按字节读的话,可能多次执行到这里 continue; } } } catch (Signal replay) { //尝试读取失败了 replay.expect(REPLAY); if (ctx.isRemoved()) { break; } //返回到检查点(checkpoint) int checkpoint = this.checkpoint; if (checkpoint >= 0) { in.readerIndex(checkpoint);//设置读索引 } else { } break; //退出循环,不再继续解码,等待下一次事件驱动 } //必须要消耗输入缓冲或者改变解码状态,否则说明程序有问题,可能无限循环 if (oldReaderIndex == in.readerIndex() && oldState == state) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " + "or change its state if it decoded something."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } } |
下面的代码是ReplayingDecoderBuffer的代码片段,可以看到它在各种读取方法执行前,进行了检查:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
final class ReplayingDecoderBuffer extends ByteBuf { private static final Signal REPLAY = ReplayingDecoder.REPLAY; //单例异常类 private ByteBuf buffer; //被装饰的字节缓冲 @Override public byte readByte() { checkReadableBytes(1); return buffer.readByte(); } @Override public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { checkReadableBytes(length); buffer.readBytes(dst, dstIndex, length); return this; } private void checkReadableBytes(int readableBytes) { //如果不可读(没有足够的字节数),抛出信号 if (buffer.readableBytes() < readableBytes) { throw REPLAY; } } } |
该类型是参数化的,其限定了输入消息的类型,至于输出什么类型不是该类关心的内容,其代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter { private final TypeParameterMatcher matcher; protected MessageToMessageDecoder() { matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I"); } public boolean acceptInboundMessage(Object msg) throws Exception { return matcher.match(msg); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { if (acceptInboundMessage(msg)) { //如果输入消息与当前类的参数类型匹配,则执行解码 I cast = (I) msg; try { decode(ctx, cast, out); } finally { //如果接收输入,必须尝试释放输入消息 ReferenceCountUtil.release(cast); } } else { //否则,只是简单的将其加入到输出中 out.add(msg); } } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { int size = out.size(); for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception; } |
该类很巧妙的利用参数化类型实现了过滤器,事实上我们可以在管道中放入各种MessageToMessageDecoder的子类型,他们只会处理类型匹配的消息。通过把多个这样的解码器串联使用,可以实现复杂消息的解码,对逻辑进行分离以便重用。
这是一个参数化类型抽象类,用于将某种消息转换为字节缓冲(ByteBuf),消息的类型通过类型参数指定。该类提供以下方法:
方法 | 说明 |
encode() | 处理出站消息,并将其转换为字节缓冲 |
在解码器一节中,我们举了一个将输入缓冲解码为一个个整型的例子,这里我们将实现一个逆过程——将整型转换为字节流:
1 2 3 4 5 6 7 8 |
public class IntegerToByteEncoder extends MessageToByteEncoder<Integer> { @Override public void encode( ChannelHandlerContext ctx, Integer msg, ByteBuf out ) throws Exception { out.writeInt( msg ); //将整数写入到输出缓冲 } } |
该类也是一个参数化类型,也只有一个必须覆盖的方法:
1 |
protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception; |
可以看到,此编码器用于将一个输入消息编码为多个输出消息的列表。
有些时候需要实现编解码器对,并将其封装到一个类中,此时可以继承各种Codec类:
Codec类 | 说明 |
ByteToByteCodec | 在字节与字节之间进行编解码 |
ByteToMessageCodec | 在字节与消息之间进行编解码 |
MessageToMessageCodec | 在消息与消息之间进行编解码 |
以上各种Codec类型都是ChannelDuplexHandler的子类,而后者:
1 |
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {} |
同时实现了入站、出站通道处理器接口,因此,Code类可以直接加入到ChannelPipeline中。
SSL/TLS为很多高层协议提供了安全支持,例如HTTPS、SMTPS。JDK提供了SslContext、SslEngine类对SSL提供支持,Netty继承了JDK的SSL组件并提供了额外的功能,使其易于被Netty应用程序使用。
通道处理器SslHandler用于处理与SSL相关的逻辑,通常可以使用ChannelInitializer来将此处理器加入到管道的头部,例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
public class SslChannelInitializer extends ChannelInitializer<Channel> { private final SSLContext context; //当进行SSL握手时,作为客户端还是服务器端 private final boolean client; //如果设置为真,第一次写请求不被SSLEngine加密 private final boolean startTls; public SslChannelInitializer( SSLContext context, boolean client, boolean startTls ) { this.context = context; this.client = client; this.startTls = startTls; } @Override protected void initChannel( Channel ch ) throws Exception { //获取一个新的SSL引擎,每个SSL处理器使用独立的引擎 SSLEngine engine = context.createSSLEngine(); //根据当前Netty应用是客户端还是服务器进行设置 engine.setUseClientMode( client ); //添加到管道最前面,大部分情况下SSL处理器都应该是第一个 ch.pipeline().addFirst( "ssl", new SslHandler( engine, startTls ) ); } } |
SslHandler提供了一些特殊的方法,可以用于修改其行为,或者在SSL/TLS握手完毕后获得通知:
方法 | 说明 |
setHandshakeTimeout() | 设置握手超时的秒数,超时后握手的ChannelFuture得到通知,类似还有setHandshakeTimeoutMillis() |
getHandshakeTimeoutMillis() | 获取握手超时毫秒数 |
setCloseNotifyTimeout() | 设置关闭超时时间,超时后连接会关闭,并导致关闭通知失败 |
handshakeFuture() | 返回一个ChannelFuture,在握手完毕后得到通知 |
close() | 发送一个close_notify以请求关闭并销毁底层的SslEngine |
关于HTTP协议的基本知识,参考:HTTP协议学习笔记
Netty内置了若干用于处理HTTP协议的编解码器,可以方便的基于Netty编写HTTP服务器或者客户端。
从逻辑上,HTTP报文可以分为头、分为多块传输的报文体、以及标记报文结尾、附加了HTTP头的尾部。HTTP协议的编解码器必须识别这些部分并做出合适的处理。下表是Netty提供的相关编解码器:
编解码器 | 说明 |
HttpRequestEncoder | 将HttpRequest、HttpContent消息转换为字节 |
HttpResponseEncoder | 将HttpResponse、HttpContent消息转换为字节 |
HttpRequestDecoder | 将字节解码为HttpRequest、HttpContent |
HttpResponseDecoder | 将字节解码为HttpResponse、HttpContent |
按如下方式修改ChannelPipeline,可以添加对HTTP协议的支持:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
public class HttpDecoderEncoderInitializer extends ChannelInitializer<Channel> { //当前应用是作为HTTP客户端还是服务器 private final boolean client; public HttpDecoderEncoderInitializer( boolean client ) { this.client = client; } @Override protected void initChannel( Channel ch ) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //下面的添加的处理器亦可改为:HttpClientCodec、HttpServerCodec if ( client ) { pipeline.addLast( "decoder", new HttpResponseDecoder() ); pipeline.addLast( "encoder", new HttpRequestEncoder() ); } else { pipeline.addLast( "decoder", new HttpRequestDecoder() ); pipeline.addLast( "encoder", new HttpResponseEncoder() ); } } } |
在管道中使用了上述编解码器后,你就可以使用各种HttpObject类型的消息了。但是由于HTTP消息可能分散为多个消息,处理它们比较繁琐,Netty提供了聚合工具,可以将HttpObject聚合为完整的FullHttpRequest、FullHttpResponse消息,要使用此聚合器,需要再HTTP编解码器后面添加:
1 2 |
//限制消息的最大长度为512KB,如果超过会导致TooLongFrameException pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024)); |
Netty支持开箱即用的HTTP压缩,可以节省网络流量:
1 2 3 4 5 6 7 8 9 10 |
if ( client ) { pipeline.addLast( "codec", new HttpClientCodec() ); pipeline.addLast( "decompressor", new HttpContentDecompressor() ); } else { pipeline.addLast( "codec", new HttpServerCodec() ); pipeline.addLast( "decompressor", new HttpContentDecompressor() ); } |
如果要启用HTTPS,只需要将SslHandler放到管道的最前面。
使用TCP长连接,通常需要处理空闲连接与超时。
对于处于空闲状态的TCP连接,为了防止对端已经不活动(比如程序崩溃)了而本端却不知道,常常采用“心跳”报文来处理(对于TCP协议来说,使用TCP保活定时器,可以在传输层完成心跳,与应用层心跳各有优劣),如果对端对心跳报文进行响应,说明它还是活动的,否则,则说明连接可能已经无效,需要关闭。
另外一种处理空闲连接的方式是超时,即对端如果空闲过长时间,直接将连接关闭。
Netty提供了若干与空闲连接、超时相关的通道处理器:
处理器类 | 说明 |
IdleStateHandler | 该处理器在连接空闲过长时间后,触发一个IdleStateEvent事件,可以监听并处理 |
ReadTimeoutHandler | 在超过一定时间后没有入站消息到达,触发一个ReadTimeoutException异常,并且关闭通道 |
WriteTimeoutHandler | 在超过一定时间后没有出站消息需要发送,触发一个WriteTimeoutException异常,并且关闭通道 |
下面是一个心跳处理的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel( Channel ch ) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //在空闲60秒后,此处理器被激,触发,调用下一个处理器的userEventTriggered //IdleStateHandler构造器的前三个参数:读空闲时间、写空闲时间、都空闲时间 pipeline.addLast( new IdleStateHandler( 0, 0, 60, TimeUnit.SECONDS ) ); //在尾部添加心跳处理器 pipeline.addLast( new HeartbeatHandler() ); } public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter { private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer( Unpooled.copiedBuffer( "HEARTBEAT", CharsetUtil.ISO_8859_1 ) ); @Override public void userEventTriggered( ChannelHandlerContext ctx, Object evt ) throws Exception { if ( evt instanceof IdleStateEvent ) { //发送心跳报文,如果对端不响应,关闭连接 ctx.writeAndFlush( HEARTBEAT_SEQUENCE.duplicate() ).addListener( ChannelFutureListener.CLOSE_ON_FAILURE ); } else { super.userEventTriggered( ctx, evt ); } } } } |
常见的基于分隔符的协议包括:SMTP、POP3、IMAP、Telnet,很多用户定义协议也是基于分隔符来分割不同的报文的,分割后的每一个结果在Netty中被称为帧(Frame)。Netty提供以下基于分隔符的解码器:
解码器类 | 说明 |
DelimiterBasedFameDecoder | 从输入中使用某种分隔符来分割出帧 |
LineBasedFrameDecoder | 以 \r \n 作为分隔符来分割出帧 |
Netty还提供了以下两种定长协议的解码器
解码器类 | 说明 |
FixedLengthFrameDecoder | 根据绝对定长方式进行帧分割 |
LengthFieldBasedFrameDecoder | 根据报文头中的某个指定了报文长度的字段来确定帧长度,进而分割 |
对于异步I/O框架来说,如何高效的写入大块数据往往是个挑战。因为当网络饱和(缓冲区已满)后,应当停止继续写入,否则可能导致OutOfMemoryError。
Netty允许基于零内存拷贝(zero-memory-copy)方式来处理文件内容的写入,它可以让文件系统到网络栈的数据传输全部在内核空间完成并提供最优化的性能。使用零内存拷贝的前提是应用程序不需要对文件的内容进行任何修改(类似于HTTP服务中的静态文件),下面是零内存拷贝的文件传输的代码示例:
1 2 3 4 5 6 7 8 9 10 |
FileInputStream in = new FileInputStream( file ); //使用FieRegion可以指定一个文件的范围 FileRegion region = new DefaultFileRegion( in.getChannel(), 0, file.length() ); //零内存拷贝的写入文件 channel.writeAndFlush( region ).addListener( new ChannelFutureListener() { public void operationComplete( ChannelFuture future ) throws Exception { if ( !future.isSuccess() ) ... } } ); |
如果要传送的不是文件,而是大块的数据,又如何呢?Netty提供了ChunkedWriteHandler这一特殊的处理器,可以用来写入大块的数据,这要这些数据以ChunkedInput实现的形式提供,内置的实现包括:
ChunkedInput实现 | 说明 |
ChunkedFile | 允许写入一个文件(仅当不能使用零内存拷贝时使用) |
ChunkedNioFile | 基于NIO,允许写入一个文件(仅当不能使用零内存拷贝时使用) |
ChunkedNioStream | 允许传送来自NIO包的ReadableByteChannel的数据流 |
ChunkedStream | 允许传送来自InputStream的数据流 |
下面是使用块传输的代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> { private final File file; public ChunkedWriteHandlerInitializer( File file ) { this.file = file; } @Override protected void initChannel( Channel ch ) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //该处理器要加在前面,因为它要接受ChunkedInput类型作为输出 pipeline.addLast( new ChunkedWriteHandler() ); //该处理器用于将输入包装为ChunkedInput pipeline.addLast( new WriteStreamHandler() ); } public final class WriteStreamHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive( ChannelHandlerContext ctx ) throws Exception { super.channelActive( ctx ); //连接建立后,发送一个文件的内容到对端 ctx.writeAndFlush( new ChunkedStream( new FileInputStream( file ) ) ); } } } |
当需要通过网络传递POJO时,就牵涉到串行化/反串行化的问题。
JDK提供的ObjectOutputStream/ObjectInputStream以及标记接口Serializable来支持串行化。Netty提供了以下编解码器与之对接:
编解码器类 | 说明 |
CompatibleObjectDecoder | 使用JDK反串行化机制来解码,可以与不使用Netty的对端配合 |
CompatibleObjectEncoder | 使用JDK串行化机制来编码,可以与不使用Netty的对端配合 |
JBoss Marshalling是一个第三方串行化框架,比JDK串行化机制快三倍。Netty提供了以下编解码器,可以与JBoss Marshalling对接:
编解码器类 | 说明 |
MarshallingEncoder | 用于串行化处理 |
MarshallingDecoder | 用于反串行化处理 |
下面是一个配合JBoss Marshalling使用的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
public class MarshallingInitializer extends ChannelInitializer<Channel> { private final MarshallerProvider marshallerProvider; private final UnmarshallerProvider unmarshallerProvider; public MarshallingInitializer( UnmarshallerProvider unmarshallerProvider, MarshallerProvider marshallerProvider ) { this.marshallerProvider = marshallerProvider; this.unmarshallerProvider = unmarshallerProvider; } @Override protected void initChannel( Channel channel ) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //注册解码、编码器。对端必须也使用JBoss Marshalling pipeline.addLast( new MarshallingDecoder( unmarshallerProvider ) ); pipeline.addLast( new MarshallingEncoder( marshallerProvider ) ); pipeline.addLast( new ObjectHandler() ); } public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> { @Override public void channelRead0( ChannelHandlerContext channelHandlerContext, Serializable serializable ) throws Exception { //对反串行化后的对象进行处理 } } } |
ProtoBuf是由Google开源的、用于方便的编解码结构化数据的框架,该框架具有多种语言的绑定,非常适合跨语言项目的模块之间的通信。Netty提供了以下编解码器实现:
编解码器类 | 说明 |
ProtobufDecoder | 基于ProtoBuf进行解码 |
ProtobufEncoder | 基于ProtoBuf进行编码 |
ProtobufVarint32FrameDecoder | 基于Varint32进行帧分割 |
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
public class ProtoBufInitializer extends ChannelInitializer<Channel> { private final MessageLite lite; public ProtoBufInitializer( MessageLite lite ) { this.lite = lite; } @Override protected void initChannel( Channel ch ) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( new ProtobufVarint32FrameDecoder() ); //用于帧分割 pipeline.addLast( new ProtobufEncoder() ); pipeline.addLast( new ProtobufDecoder( lite ) ); pipeline.addLast( new ObjectHandler() ); } public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> { @Override public void channelRead0( ChannelHandlerContext ctx, Object msg ) throws Exception { } } } |
Netty应用程序有两种启动类型:
- 一种作为类似服务器(Server-like)的通道启动,可以接受连接,并且为每一个接受的连接创建子通道
- 另外一种作为类似于客户端的通道启动,它不会创建子通道,所有操作都在父通道中完成。这类通道不但用于TCP连接的客户端,还用于无连接通信的服务器端
由于很多通道使用相同或者类似的属性,为了简化处理,Netty使AbstractBootstrap类可克隆。克隆时,配置信息将被深拷贝,而底层的事件循环组则仅仅会浅拷贝。
需要启动客户端或者无连接协议时,必须使用Bootstrap类,该类提供以下方法
方法 | 说明 |
group() | 设置用于处理I/O的事件循环组。事件循环组的类型必须与通道类型匹配,不能使用OIO的事件循环组处理NIO通道 |
channel() | 实例化通道时使用的类 |
channelFactory() | 如果通道不能基于0-arg构造器创建,可以提供一个工厂类 |
localAddress() | 通道绑定到的本地地址,如果不指定,将由操作系统随机指定。可以在bind()、connect()时指定本地地址 |
option() | 需要应用到通道的ChannelConfig的ChannelOptions。这些选项会在bind()、connect()被调用时,设置到通道上。在此之后改变这些选项没有意义。支持哪些ChannelOptions取决于具体的通道类型 |
attr() | 在bind()、connect()时,为目标通道设置属性,在这两个方法之后设置无效 |
handler() | 设置通道处理器 |
clone() | 克隆当前Bootstrap对象 |
remoteAddress() | 设置需要连接的远程地址,可以在connect()时设置 |
connect() | 尝试连接到对端并返回一个ChannelFuture |
bind() | 尝试绑定一个本地地址并返回一个ChannelFuture,在绑定成功后,可能需要通过connect连接到对端 |
需要启动服务器端时,必须使用ServerBootstrap类,该类提供以下方法:
方法 | 说明 |
group() | 指定事件循环组,可以指定两个,一个单纯用于接受连接,另外一个用于处理子通道的I/O |
channel() | 用于创建服务器通道(ServerChannel)实例的类 |
channelFactory() | 创建服务器通道的工厂 |
localAddress() | 服务器通道绑定的本地地址 |
option() | 服务器通道的选项 |
childOption() | 子通道的选项。每当accept一个客户端连接后,服务器通道就会创建一个子通道 |
attr() | 服务器通道的属性 |
childAttr() | 子通道的属性 |
handler() | 服务器通道的处理器,一般不需要指定 |
childHandler() | 子通道的处理器 |
clone() | 克隆配置 |
bind() | 绑定到本地地址并且返回一个ChannelFuture |
假设你正在编写一个代理服务器,可能遇到如下的应用场景:
- 代理服务器启动监听
- 客户端连接到代理服务器,要求访问某个URL
- 代理服务器访问第三方服务器,访问URL并获取数据
- 代理服务器将数据返回给客户端
在上面的情况下,需要在通道内部启动一个客户端通道:第3、4部通常发生在入站请求事件的处理方法channelRead中,这是你可以创建一个全新的Bootstrap供客户端使用,但是更好的做法是复用代理服务器的事件循环,以减少资源消耗:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
@Override public void channelActive( ChannelHandlerContext ctx ) throws Exception { //一旦客户端连接到此代理服务器 Bootstrap bootstrap = new Bootstrap(); //就尝试启动一个客户端,连接gmem.cc获取内容 bootstrap .channel( NioSocketChannel.class ) .handler( new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0( ChannelHandlerContext ctx, ByteBuf in ) throws Exception { //在此读取gmem.cc发来的内容 } } ); //注意:这里我们让客户端直接使用了代理服务器的事件循环组 bootstrap.group( ctx.channel().eventLoop() ); bootstrap.connect( new InetSocketAddress( "http://gmem.cc", 80 ) ); } |
每个通道设置选项是非常繁琐的事情,幸好Netty提供了在Bootstrap时为通道预先设置选项的接口options,该接口允许指定ChannelOptions给未来创建的通道。这些选项可以使用低级别的API,以控制连接的各项特性。
属性(Attributes)则允许你将自定义数据(例如用户身份信息)与通道(线程)安全的进行关联。
UDP是无连接的协议,性能要求高(比起TCP,速度相当快)、对丢包较为容忍的应用场景下适合使用UDP。DNS就是一种基于UDP的常见协议。
本节将以一个模拟Syslog的广播应用场景,来说明如何通过Netty实现UDP通信:
- 读取日志文件的内容
- 将每一行广播出去
- 网络上所有的订阅者监听并获取广播内容
广播类(发布者)代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
public class LogEventBroadcaster { public static void main() throws Exception { //受限的广播地址,发往这类地址的数据报不会被路由器转发出去 final InetSocketAddress BCAST_ADDR = new InetSocketAddress( "255.255.255.255", Helper.DEFAULT_PORT ); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group( group ) //使用基于NIO的数据报通道类 .channel( NioDatagramChannel.class ) //该选项指明使用广播方式发送数据 .option( ChannelOption.SO_BROADCAST, true ) //处理出站的LogEvent类型的数据 .handler( new MessageToMessageEncoder<LogEvent>() { @Override protected void encode( ChannelHandlerContext ctx, LogEvent event, List<Object> out ) throws Exception { ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes( event.getMsg().getBytes( CharsetUtil.UTF_8 ) ); //将日志事件封装为数据报,指定目的地址为广播地址 //Netty会自动处理此类型的消息,并执行广播 out.add( new DatagramPacket( buf, BCAST_ADDR ) ); } } ); try { Channel ch = bootstrap.bind().syncUninterruptibly().channel(); File file = new File( "syslog" ); boolean eof = false; int line = 0; while ( !eof ) { LogEvent event = readLineAsEvent( file, line++ ); ch.write( event ); } } finally { group.shutdownGracefully(); } } private static LogEvent readLineAsEvent( File file, int line ) { //从文件中读取一行 String content = read( file, line ); return new LogEvent( content ); } } |
监听类(订阅者)代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
public class LogEventMonitor { public static void main( String[] args ) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group( group ) //UDP客户端与服务端的配置一样 .channel( NioDatagramChannel.class ) .option( ChannelOption.SO_BROADCAST, true ) .handler( new ChannelInitializer<Channel>() { @Override protected void initChannel( Channel channel ) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //将UDP数据报解析为LogEvent pipeline.addLast( new MessageToMessageDecoder<DatagramPacket>() { @Override protected void decode( ChannelHandlerContext ctx, DatagramPacket pkt, List<Object> out ) throws Exception { ByteBuf data = pkt.content(); out.add( new LogEvent( data.toString( CharsetUtil.UTF_8 ) ) ); } } ); pipeline.addLast( new SimpleChannelInboundHandler<LogEvent>() { @Override protected void channelRead0( ChannelHandlerContext ctx, LogEvent msg ) throws Exception { //处理日志 } } ); } } ); try { //绑定UDP端口以监听 Channel ch = bootstrap.bind( Helper.DEFAULT_PORT ).syncUninterruptibly().channel(); ch.closeFuture().await(); } finally { group.shutdownGracefully(); } } } |
多播与广播类似,下面是一个支持收发多播报文的代码样例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
public class Multicaster { private static final Logger LOGGER = LoggerFactory.getLogger( Multicaster.class ); public static void main( String[] args ) throws Exception { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap.group( group ) //基于NIO的多播,必须要求JDK1.7+ .channelFactory( new ChannelFactory<Channel>() { @Override public Channel newChannel() { //强制使用IPv4套接字,否则可能报错: //IPv6 socket cannot join IPv4 multicast group return new NioDatagramChannel( InternetProtocolFamily.IPv4 ); } } ) .handler( new ChannelInitializer<Channel>() { @Override protected void initChannel( Channel ch ) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( new ChannelInboundHandlerAdapter() { public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exception { DatagramPacket pkt = (DatagramPacket) msg; LOGGER.debug( "Received message from multicast group: {}", pkt.content().toString( CharsetUtil.UTF_8 ) ); ReferenceCountUtil.release( msg ); } } ); pipeline.addLast( new ChannelOutboundHandlerAdapter() { public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) throws Exception { LOGGER.debug( "Message to be sent to multicast group: {}", msg ); super.write( ctx, msg, promise ); } } ); } } ); try { int gport = 6155; InetSocketAddress gaddr = new InetSocketAddress( "239.255.2.3", gport ); //要监听来自多播组的信息,必须绑定到多播端口对应的通配符地址 NioDatagramChannel ch = (NioDatagramChannel) bootstrap.bind( gport ).syncUninterruptibly().channel(); //多播地址总是需要跟一个网络接口关联的,这里使用IPv4的环回网卡 NetworkInterface nif = NetworkInterface.getByInetAddress( NetUtil.LOCALHOST4 ); //加入多播组,之后,当前套接字才能针对多播组收发消息 ch.joinGroup( gaddr, nif ).sync(); //向多播组发送信息 ByteBuf msg = Unpooled.copiedBuffer( "My bonnie is over the ocean.", CharsetUtil.UTF_8 ); ch.writeAndFlush( new DatagramPacket( msg, gaddr ) ).sync(); //离开多播组 ch.leaveGroup( gaddr, nif ).sync(); ch.close(); } finally { group.shutdownGracefully(); } } } |