Java NIO浅析
I/O是计算机与外部世界或者一个程序与计算机的其余部分的之间的接口,是操作系统的重要组成部分,I/O核心功能通常是操作系统内核的一部分。
NIO即“New I/O”(也有人将其解释为Non-blocking I/O),是JDK 1.4引入的新的输入/输出API,它提供高速的、基于“块”的 I/O,比起Java传统基于“流”的I/O组件具有更好的性能。
NIO支持异步的IO操作。一般在进行 read() 调用时,会阻塞直至有可供读取的数据;同样, write() 调用会阻塞直至数据能够写入,异步IO则是一种不阻塞的读写数据的方法,使用异步IO,既不需要多个大量线程,也不需要使用轮询。
通道(Channel)和缓冲区(Buffer)是NIO中最核心的两个对象,几乎所有IO操作都需要用到。
Channel相当于Java传统IO的流对象,任何数据都需要通过一个Channel对象来接受/发送。
与InputStream、OutputStream不同的是,Channel是双向的,可以读、可以写,也可以读写。Channel更加匹配底层操作系统模型,在Unix-like操作系统上,底层通道就是双向的。
最常用的通道子类包括:
- FileChannel:用于执行文件读写。该类提供以下特有方法:
方法 说明 lock() 可以进行文件锁定,包括部分锁定/完全锁定,锁定的方式包括共享锁/独占锁。为兼容性考虑,应当尽量只使用独占锁 position() 获取或者设置FileChannel当前所处的位置:
1234#获取当前位置long pos = fc.position();#设置当前位置,跳转到1KB之后fc.position(pos + 1024);size() 返回关联的文件的尺寸 truncate() 将文件截断到指定的大小 force() 将通道里尚未写入磁盘的数据强制写到磁盘上 - DatagramChannel:用于通过UDP协议读写数据
- SocketChannel:用于通过TCP读写数据
- ServerSocketChannel:作为服务器端套接字,来监听新的TCP连接,对于每个新连接会创建一个SocketChannel对象
Buffer则相当于数据的容器,发送给一个Channel的数据首先必须放到Buffer中;从一个Channel接收到的数据也必须首先读取到Buffer中。任何时候,都不会直接对Channel进行数据读写。
最常用的Buffer子类是ByteBuffer,此外所有Java基本类型都具有对应的Buffer,包括ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer
一般情况下,使用Buffer进行读写的步骤如下:
- 写入数据到Buffer,如果Buffer非空,新数据将附加在原有数据后面
- 调用flip方法,切换到读模式
- 从Buffer中读取数据
- 调用clear或compact方法,进行清理
Buffer以下属性和方法非常重要:
属性/方法 | 说明 |
capacity | 即Buffer对应的内存块的大小,一旦Buffer已满,必须读取或者清除其数据才能继续写入 |
position |
写模式,position表示当前的位置,初始值为0,写一个数据则++ 读模式,开始时有flip将position置0,每读一次,position移动到下一个可读位置 |
limit |
写模式,limit表示最多可以向其中写入多少数据 读模式,limit表示最多能读取多少数据 |
get() |
从Buffer中读取数据 |
rewind() |
将position置零,以便重新读取数据 |
clear() |
将position置零,limit设置为capacity,相当于清空数据 |
compact() | 将所有未读的数据拷贝到Buffer起始处,然后将position设置到最后一个未读元素的后面。可以防止覆盖未读数据 |
mark() | 在Buffer的某个position设置标记 |
reset() | 重置position到之前mark()设置的位置 |
Selector允许单个线程处理多个Channel,如果同时打开了多个通道,但是每个通道的流量很低,使用Selector将很好的提高性能。Selector适用于处理网络连接,使用更少的线程处理套接字,可以大大的减少上下文切换的开销。Selector是NIO实现异步IO的核心。
Selector的实现依平台而不同,以Windows为例,每个Selector会占用两个通常是相邻的、环回地址(127.0.0.1)上的端口。
要使用Selector,可以将多个Channel注册到Selector,然后调用其select方法,该方法会一直阻塞,直到某个注册的通道上有事件(有新的连接进入、有数据发送过来)就绪才返回。
抽象类SelectableChannel用于支持通道的注册,它提供以下方法:
方法 | 说明 | ||
register() |
将通道注册到某个Selector,并返回一个SelectionKey ,需要提供一个整数选项,表示Selector对通道上发生的什么事件感兴趣,这些事件包括: SelectionKey.OP_CONNECT:连接就绪,表示通道正准备连接到服务器 如果对多个事件感兴趣,可以进行按位与:
|
||
configureBlocking() | 配置通道的阻塞模式,与Selector一起使用时,Channel必须处于非阻塞模式 |
管道用于连接两个通道,其中一个通道用于读,另外一个用于写,因此管道的数据是单向流动的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
File txt = new File( "F:/Temp/Readme.txt" ); { /** 读文件代码示例 **/ FileInputStream fis = new FileInputStream( txt ); //从文件输入流获取文件通道 FileChannel fc = fis.getChannel(); //创建缓冲区,分配1KB内存 ByteBuffer buf = ByteBuffer.allocate( 1024 ); //从通道读取数据到缓冲区(对于缓冲区本身是写操作) fc.read( buf ); buf.flip();//修改缓冲区为读模式 while ( buf.hasRemaining() ) { System.out.println( buf.get() ); } fc.close(); fis.close(); } { /** 写文件代码示例 **/ FileOutputStream fos = new FileOutputStream( txt ); FileChannel fc = fos.getChannel(); ByteBuffer buf = ByteBuffer.allocate( 1024 ); for ( byte i = 0; i < 512; i++ ) { buf.put( i ); //写入数据到缓冲区 } buf.flip(); //将缓冲区写入到通道 fc.write( buf ); fc.close(); fos.close(); } { /** 同时读写文件代码示例 **/ String infile = "F:/Temp/src.txt"; String outfile = "F:/Temp/dest.txt"; FileInputStream fin = new FileInputStream( infile ); FileOutputStream fout = new FileOutputStream( outfile ); FileChannel fcin = fin.getChannel(); FileChannel fcout = fout.getChannel(); ByteBuffer buffer = ByteBuffer.allocate( 1024 ); while ( true ) { //一旦读完Buffer中的数据,需要让Buffer准备好再次被写入 //clear或compact方法可以完成此准备 buffer.clear(); int r = fcin.read( buffer ); //当到达流的结尾使,read方法返回-1 if ( r == -1 ) { break; } //flip方法将Buffer从写模式切换到读模式,flip: //将limit设置为position,记录上一次写了多少数据 //将position设置为0,用于记录读到哪里 buffer.flip(); fcout.write( buffer ); } fin.close(); fout.close(); } |
FileChannel具有transferTo、transferFrom方法,可以用于与其他通道进行直接的数据交换:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
{ RandomAccessFile fromFile = new RandomAccessFile( "src.txt", "rw" ); FileChannel fromChannel = fromFile.getChannel(); RandomAccessFile toFile = new RandomAccessFile( "dest.txt", "rw" ); FileChannel toChannel = toFile.getChannel(); long position = 0; //传输的起始位置 long count = fromChannel.size();//最大传输的字节数 //从源读取 toChannel.transferFrom( fromChannel, position, count ); //发送到目标 fromChannel.transferTo( position, count, toChannel ); } |
FileChannel还具有文件锁定功能,所有的锁定都是“劝告式的(advisory)”,可以获取整个文件或者其一部分的共享或排他锁定:
1 2 3 4 5 6 7 |
RandomAccessFile raf = new RandomAccessFile( "src.txt", "rw" ); FileChannel fc = raf.getChannel(); long start = 0; long end = fc.size() - 1; FileLock lock = fc.lock( start, end, false ); //执行敏感操作…… lock.release(); |
下面是基于NIO的Echo服务的示例,忽略了部分的异常处理逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
package cc.gmem.study.network.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cc.gmem.study.network.basic.Helper; public class NIOEchoServer { private static final String ENCODING = "ISO-8859-1"; private static Map<SocketChannel, List<byte[]>> scData = new HashMap<SocketChannel, List<byte[]>>(); private static final Logger LOGGER = LoggerFactory.getLogger( NIOEchoServer.class ); public static void main( String[] args ) throws IOException { //创建新的选择器 Selector selector = Selector.open(); //每个线程只需要一个缓冲区 ByteBuffer buf = ByteBuffer.allocate( 8192 ); { //创建服务器套接字通道 ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking( false ); //任何通道都必须配置问非阻塞的,否则异步IO无法工作 //通过底层的套接字对象绑定端口 ServerSocket ss = ssc.socket(); InetSocketAddress address = new InetSocketAddress( Helper.DEFAULT_PORT ); ss.bind( address ); //注册监听:新的客户端连接 //注意:可以针对不同的事件类型注册多次;多个服务器套接字通道也可以注册到同一个选择器 //SelectionKey可以用于取消通道的注册 SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT ); //这是服务器套接字通道唯一支持的事件类型 LOGGER.debug( "Registered ServerSocketChannel, SelectionKey: {}", key ); } //事件处理循环 while ( true ) { //调用Select方法并阻塞,直到至少有一个事件发生,该调用会返回事件的个数 LOGGER.debug( "Prepare to do blocked selection..." ); int num = selector.select(); //得到该选择器上注册的所有监听 Set<SelectionKey> keys = selector.selectedKeys(); LOGGER.debug( "{} io-ready keys, selected-key set: {}", num, keys.size() ); //遍历,并检查各类型的事件有无发生 for ( Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) { SelectionKey key = iter.next(); //移除,否则同一键在下一轮还会处于激活状态 //每个新到来的连接请求都会生成自己的SelectionKey iter.remove(); if ( key.isAcceptable() ) { //该事件类型是针对“服务器套接字通道的” ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); //接受此连接 scData.put( sc, new ArrayList<byte[]>() ); sc.configureBlocking( false ); //设置新创建的套接字为非阻塞 sc.write( ByteBuffer.wrap( "Welcome to NIO Echo Server.\n".getBytes( ENCODING ) ) ); LOGGER.debug( "Client connected: {}", sc.socket().getRemoteSocketAddress() ); //将新创建的套接字注册到选择器,监听其读就绪事件 sc.register( selector, SelectionKey.OP_READ ); } else if ( key.isReadable() ) { //该事件类型是针对“套接字通道的” SocketChannel sc = (SocketChannel) key.channel(); //切换到写模式 buf.clear(); int bytesCount = -1; try { bytesCount = sc.read( buf ); //非阻塞模式下,操作会立即返回 } catch ( IOException e ) { LOGGER.warn( e.getMessage() ); } if ( bytesCount < 0 ) { LOGGER.debug( "Connection closed by peer : {}", sc.socket().getRemoteSocketAddress() ); sc.close(); //关闭通道 key.cancel(); //取消键注册,下一次select()时该键会被彻底移除 } else { byte[] data = new byte[bytesCount]; //切换到读模式 buf.flip(); buf.get( data ); scData.get( sc ).add( data ); String msg = new String( data, ENCODING ); LOGGER.debug( "Received client message: {}", msg ); //改变此键的关注点为写 key.interestOps( SelectionKey.OP_WRITE ); if ( "bye".equals( msg ) ) { LOGGER.debug( "Echo service finished for : {}", sc.socket().getRemoteSocketAddress() ); sc.close(); key.cancel(); } } } else if ( key.isWritable() ) { //切换到写模式 buf.clear(); //回响:将先前接收到的信息发回给客户端 SocketChannel sc = (SocketChannel) key.channel(); int byteCount = 0; for ( Iterator<byte[]> it = scData.get( sc ).iterator(); it.hasNext(); ) { byte[] data = it.next(); buf.put( data ); byteCount += data.length; it.remove(); } //切换到读模式 buf.flip(); sc.write( buf );//非阻塞模式下,操作会立即返回 LOGGER.debug( "{} bytes of message echoed for : {}", byteCount, sc.socket().getRemoteSocketAddress() ); //改变此键的关注点为读 key.interestOps( SelectionKey.OP_READ ); } } } } } |
下面是EchoClient的简单示例:
1 2 3 4 5 6 7 8 |
SocketChannel sc = SocketChannel.open(); sc.configureBlocking( false ); //由于非阻塞,因此下面的connect调用会立即返回 sc.connect( new InetSocketAddress( "127.0.0.1", Helper.DEFAULT_PORT ) ); while ( !sc.finishConnect() ) { //等待连接成功,或者执行其它操作 } |
使用CharsetEncoder、CharsetDecoder分别可以进行字符串的编码、解码:
1 2 3 4 5 6 7 8 |
Charset utf8 = Charset.forName( "UTF-8" ); CharsetDecoder decoder = utf8.newDecoder(); CharsetEncoder encoder = utf8.newEncoder(); ByteBuffer idata = ByteBuffer.allocate( 1024 ); //解码为字符缓冲 CharBuffer cb = decoder.decode( idata ); //编码为字节缓冲 ByteBuffer odata = encoder.encode( cb ); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Pipe pipe = Pipe.open(); ByteBuffer buf = ByteBuffer.allocate( 48 ); //要写入数据到管道,必须获取sink通道 Pipe.SinkChannel sink = pipe.sink(); buf.clear(); buf.put( new byte[] { 19, 12, 19, 89 } ); buf.flip(); while ( buf.hasRemaining() ) { sink.write( buf ); } //要从管道读取数据,必须使用source通道 Pipe.SourceChannel source = pipe.source(); buf.clear(); int bytesCount = source.read( buf ); buf.flip(); byte[] bytes = new byte[bytesCount]; buf.get( bytes ); |
Leave a Reply