Java NIO浅析
NIO即“New I/O”(也有人将其解释为Non-blocking I/O),是JDK 1.4引入的新的输入/输出API,它提供高速的、基于“块”的 I/O,比起Java传统基于“流”的I/O组件具有更好的性能。
NIO支持异步的IO操作。一般在进行 read() 调用时,会阻塞直至有可供读取的数据;同样, write() 调用会阻塞直至数据能够写入,异步IO则是一种不阻塞的读写数据的方法,使用异步IO,既不需要多个大量线程,也不需要使用轮询。
- FileChannel:用于执行文件读写。该类提供以下特有方法:
方法 说明 lock() 可以进行文件锁定,包括部分锁定/完全锁定,锁定的方式包括共享锁/独占锁。为兼容性考虑,应当尽量只使用独占锁 position() 获取或者设置FileChannel当前所处的位置:
1234#获取当前位置long pos = fc.position();#设置当前位置,跳转到1KB之后fc.position(pos + 1024);size() 返回关联的文件的尺寸 truncate() 将文件截断到指定的大小 force() 将通道里尚未写入磁盘的数据强制写到磁盘上 - DatagramChannel:用于通过UDP协议读写数据
- SocketChannel:用于通过TCP读写数据
- ServerSocketChannel:作为服务器端套接字,来监听新的TCP连接,对于每个新连接会创建一个SocketChannel对象
- 写入数据到Buffer,如果Buffer非空,新数据将附加在原有数据后面
- 调用flip方法,切换到读模式
- 从Buffer中读取数据
- 调用clear或compact方法,进行清理
属性/方法 | 说明 |
capacity | 即Buffer对应的内存块的大小,一旦Buffer已满,必须读取或者清除其数据才能继续写入 |
position |
写模式,position表示当前的位置,初始值为0,写一个数据则++ 读模式,开始时有flip将position置0,每读一次,position移动到下一个可读位置 |
limit |
写模式,limit表示最多可以向其中写入多少数据 读模式,limit表示最多能读取多少数据 |
get() |
从Buffer中读取数据 |
rewind() |
将position置零,以便重新读取数据 |
clear() |
将position置零,limit设置为capacity,相当于清空数据 |
compact() | 将所有未读的数据拷贝到Buffer起始处,然后将position设置到最后一个未读元素的后面。可以防止覆盖未读数据 |
mark() | 在Buffer的某个position设置标记 |
reset() | 重置position到之前mark()设置的位置 |
方法 | 说明 | ||
register() |
将通道注册到某个Selector,并返回一个SelectionKey ,需要提供一个整数选项,表示Selector对通道上发生的什么事件感兴趣,这些事件包括: SelectionKey.OP_CONNECT:连接就绪,表示通道正准备连接到服务器 如果对多个事件感兴趣,可以进行按位与:
configureBlocking() | 配置通道的阻塞模式,与Selector一起使用时,Channel必须处于非阻塞模式 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
File txt = new File( "F:/Temp/Readme.txt" ); { /** 读文件代码示例 **/ FileInputStream fis = new FileInputStream( txt ); //从文件输入流获取文件通道 FileChannel fc = fis.getChannel(); //创建缓冲区,分配1KB内存 ByteBuffer buf = ByteBuffer.allocate( 1024 ); //从通道读取数据到缓冲区(对于缓冲区本身是写操作) buf ); buf.flip();//修改缓冲区为读模式 while ( buf.hasRemaining() ) { System.out.println( buf.get() ); } fc.close(); fis.close(); } { /** 写文件代码示例 **/ FileOutputStream fos = new FileOutputStream( txt ); FileChannel fc = fos.getChannel(); ByteBuffer buf = ByteBuffer.allocate( 1024 ); for ( byte i = 0; i < 512; i++ ) { buf.put( i ); //写入数据到缓冲区 } buf.flip(); //将缓冲区写入到通道 fc.write( buf ); fc.close(); fos.close(); } { /** 同时读写文件代码示例 **/ String infile = "F:/Temp/src.txt"; String outfile = "F:/Temp/dest.txt"; FileInputStream fin = new FileInputStream( infile ); FileOutputStream fout = new FileOutputStream( outfile ); FileChannel fcin = fin.getChannel(); FileChannel fcout = fout.getChannel(); ByteBuffer buffer = ByteBuffer.allocate( 1024 ); while ( true ) { //一旦读完Buffer中的数据,需要让Buffer准备好再次被写入 //clear或compact方法可以完成此准备 buffer.clear(); int r = buffer ); //当到达流的结尾使,read方法返回-1 if ( r == -1 ) { break; } //flip方法将Buffer从写模式切换到读模式,flip: //将limit设置为position,记录上一次写了多少数据 //将position设置为0,用于记录读到哪里 buffer.flip(); fcout.write( buffer ); } fin.close(); fout.close(); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
{ RandomAccessFile fromFile = new RandomAccessFile( "src.txt", "rw" ); FileChannel fromChannel = fromFile.getChannel(); RandomAccessFile toFile = new RandomAccessFile( "dest.txt", "rw" ); FileChannel toChannel = toFile.getChannel(); long position = 0; //传输的起始位置 long count = fromChannel.size();//最大传输的字节数 //从源读取 toChannel.transferFrom( fromChannel, position, count ); //发送到目标 fromChannel.transferTo( position, count, toChannel ); } |
1 2 3 4 5 6 7 |
RandomAccessFile raf = new RandomAccessFile( "src.txt", "rw" ); FileChannel fc = raf.getChannel(); long start = 0; long end = fc.size() - 1; FileLock lock = fc.lock( start, end, false ); //执行敏感操作…… lock.release(); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
package; import; import; import; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import; public class NIOEchoServer { private static final String ENCODING = "ISO-8859-1"; private static Map<SocketChannel, List<byte[]>> scData = new HashMap<SocketChannel, List<byte[]>>(); private static final Logger LOGGER = LoggerFactory.getLogger( NIOEchoServer.class ); public static void main( String[] args ) throws IOException { //创建新的选择器 Selector selector =; //每个线程只需要一个缓冲区 ByteBuffer buf = ByteBuffer.allocate( 8192 ); { //创建服务器套接字通道 ServerSocketChannel ssc =; ssc.configureBlocking( false ); //任何通道都必须配置问非阻塞的,否则异步IO无法工作 //通过底层的套接字对象绑定端口 ServerSocket ss = ssc.socket(); InetSocketAddress address = new InetSocketAddress( Helper.DEFAULT_PORT ); ss.bind( address ); //注册监听:新的客户端连接 //注意:可以针对不同的事件类型注册多次;多个服务器套接字通道也可以注册到同一个选择器 //SelectionKey可以用于取消通道的注册 SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT ); //这是服务器套接字通道唯一支持的事件类型 LOGGER.debug( "Registered ServerSocketChannel, SelectionKey: {}", key ); } //事件处理循环 while ( true ) { //调用Select方法并阻塞,直到至少有一个事件发生,该调用会返回事件的个数 LOGGER.debug( "Prepare to do blocked selection..." ); int num =; //得到该选择器上注册的所有监听 Set<SelectionKey> keys = selector.selectedKeys(); LOGGER.debug( "{} io-ready keys, selected-key set: {}", num, keys.size() ); //遍历,并检查各类型的事件有无发生 for ( Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext(); ) { SelectionKey key =; //移除,否则同一键在下一轮还会处于激活状态 //每个新到来的连接请求都会生成自己的SelectionKey iter.remove(); if ( key.isAcceptable() ) { //该事件类型是针对“服务器套接字通道的” ServerSocketChannel ssc = (ServerSocketChannel); SocketChannel sc = ssc.accept(); //接受此连接 scData.put( sc, new ArrayList<byte[]>() ); sc.configureBlocking( false ); //设置新创建的套接字为非阻塞 sc.write( ByteBuffer.wrap( "Welcome to NIO Echo Server.\n".getBytes( ENCODING ) ) ); LOGGER.debug( "Client connected: {}", sc.socket().getRemoteSocketAddress() ); //将新创建的套接字注册到选择器,监听其读就绪事件 sc.register( selector, SelectionKey.OP_READ ); } else if ( key.isReadable() ) { //该事件类型是针对“套接字通道的” SocketChannel sc = (SocketChannel); //切换到写模式 buf.clear(); int bytesCount = -1; try { bytesCount = buf ); //非阻塞模式下,操作会立即返回 } catch ( IOException e ) { LOGGER.warn( e.getMessage() ); } if ( bytesCount < 0 ) { LOGGER.debug( "Connection closed by peer : {}", sc.socket().getRemoteSocketAddress() ); sc.close(); //关闭通道 key.cancel(); //取消键注册,下一次select()时该键会被彻底移除 } else { byte[] data = new byte[bytesCount]; //切换到读模式 buf.flip(); buf.get( data ); scData.get( sc ).add( data ); String msg = new String( data, ENCODING ); LOGGER.debug( "Received client message: {}", msg ); //改变此键的关注点为写 key.interestOps( SelectionKey.OP_WRITE ); if ( "bye".equals( msg ) ) { LOGGER.debug( "Echo service finished for : {}", sc.socket().getRemoteSocketAddress() ); sc.close(); key.cancel(); } } } else if ( key.isWritable() ) { //切换到写模式 buf.clear(); //回响:将先前接收到的信息发回给客户端 SocketChannel sc = (SocketChannel); int byteCount = 0; for ( Iterator<byte[]> it = scData.get( sc ).iterator(); it.hasNext(); ) { byte[] data =; buf.put( data ); byteCount += data.length; it.remove(); } //切换到读模式 buf.flip(); sc.write( buf );//非阻塞模式下,操作会立即返回 LOGGER.debug( "{} bytes of message echoed for : {}", byteCount, sc.socket().getRemoteSocketAddress() ); //改变此键的关注点为读 key.interestOps( SelectionKey.OP_READ ); } } } } } |
1 2 3 4 5 6 7 8 |
SocketChannel sc =; sc.configureBlocking( false ); //由于非阻塞,因此下面的connect调用会立即返回 sc.connect( new InetSocketAddress( "", Helper.DEFAULT_PORT ) ); while ( !sc.finishConnect() ) { //等待连接成功,或者执行其它操作 } |
1 2 3 4 5 6 7 8 |
Charset utf8 = Charset.forName( "UTF-8" ); CharsetDecoder decoder = utf8.newDecoder(); CharsetEncoder encoder = utf8.newEncoder(); ByteBuffer idata = ByteBuffer.allocate( 1024 ); //解码为字符缓冲 CharBuffer cb = decoder.decode( idata ); //编码为字节缓冲 ByteBuffer odata = encoder.encode( cb ); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Pipe pipe =; ByteBuffer buf = ByteBuffer.allocate( 48 ); //要写入数据到管道,必须获取sink通道 Pipe.SinkChannel sink = pipe.sink(); buf.clear(); buf.put( new byte[] { 19, 12, 19, 89 } ); buf.flip(); while ( buf.hasRemaining() ) { sink.write( buf ); } //要从管道读取数据,必须使用source通道 Pipe.SourceChannel source = pipe.source(); buf.clear(); int bytesCount = buf ); buf.flip(); byte[] bytes = new byte[bytesCount]; buf.get( bytes ); |
