使用Java进行网络编程
Java网络编程的特点
- 底层细节被隐藏:传统的网络编程依赖操作系统提供的各种函数,使用起来异常复杂,需要关注网络协议的底层细节。而Java平台对网络编程接口进行一致性封装,内置的线程机制也便于操控并发的网络连接
Java网络编程原生接口
类/接口 | 说明 |
InetAddress | 代表一个IP地址,支持根据主机名、域名获取IP地址,或者执行DNS反查,亦可对地址的性质进行判断 |
InetSocketAddress | 代表一个InetAddress与有一个端口号的组合 |
ServerSocket | 代表服务器端的侦听对象,本身和套接字无关(比较奇怪的命名)。其accept方法一旦侦听到一个客户端连接,即返回代表套接字的Socket对象 |
Socket | 代表连接双方的套接字对象。客户用它初始一次连接,服务器端接受连接后也得到该对象。可以使用getInputStream()、getOutputStream()方法来读写Socket |
TCP编程
基于原生API编程
传统Socket API的多客户端回响服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
package cc.gmem.study.network; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class EchoServer { private static final Logger LOGGER = LoggerFactory.getLogger( EchoServer.class ); public static class EchoThread extends Thread { private Socket socket; public EchoThread( Socket socket ) { this.socket = socket; } @Override public void run() { process(); } private void process() { try { BufferedReader in = new BufferedReader( new InputStreamReader( socket.getInputStream() ) ); //每次println自动刷新缓冲区,使信息能正式通过网络传递出去 PrintWriter out = new PrintWriter( new BufferedWriter( new OutputStreamWriter( socket.getOutputStream() ) ), true ); for ( ;; ) { String request = in.readLine(); if ( "bye".equalsIgnoreCase( request ) ) break; out.println( request ); } } catch ( IOException e ) { LOGGER.error( e.getMessage(), e ); } finally { try { InetSocketAddress addr = (InetSocketAddress) socket.getRemoteSocketAddress(); LOGGER.debug( "{}:{} closing", addr.getHostName(), addr.getPort() ); socket.close(); } catch ( IOException e ) { } } } } public static void main( String[] args ) throws IOException { ServerSocket listener = new ServerSocket( Helper.DEFAULT_PORT ); LOGGER.debug( "Listening on {} ", Helper.DEFAULT_PORT ); for ( ;; ) { Socket socket = listener.accept(); InetSocketAddress addr = (InetSocketAddress) socket.getRemoteSocketAddress(); LOGGER.debug( "{}:{} connected", addr.getHostName(), addr.getPort() ); EchoThread thread = new EchoThread( socket ); thread.start(); } } } |
但凡是这种每个Socket对应一个处理线程的服务器端,其scaliblity都非常差。因为线程是较为昂贵的资源,操作系统中线程数量过多时,上下文切换的开销将很大,并且操作系统支持的线程是有限的。
相比起线程,Socket本身是非常廉价的资源,维持一个TCP连接并不需要消耗多少内存或者网络流量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
package cc.gmem.study.network; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class EchoClient { private static final Logger LOGGER = LoggerFactory.getLogger( EchoClient.class ); public static void main( String[] args ) throws Throwable { Socket socket = new Socket( InetAddress.getByName( null ), Helper.DEFAULT_PORT ); try { BufferedReader in = new BufferedReader( new InputStreamReader( socket.getInputStream() ) ); PrintWriter out = new PrintWriter( new BufferedWriter( new OutputStreamWriter( socket.getOutputStream() ) ), true ); for ( int i = 0; i < 10; i++ ) { out.println( "Hello " + i ); String echo = in.readLine(); LOGGER.debug( "Message from server: {}", echo ); } out.println( "bye" ); } finally { InetSocketAddress addr = (InetSocketAddress) socket.getRemoteSocketAddress(); LOGGER.debug( "Disconnecting from {}:{}", addr.getHostName(), addr.getPort() ); socket.close(); } } } |
基于NIO的多客户端回响服务
使用Java的NIO编程接口,可以避免一个线程对应一个套接字的处理模式,而让单个线程管理多个套接字,结合使用Selector,可以实现异步IO,大大提升系统的吞吐能力。
代码样例请参考:Java NIO浅析
基于NIO2的多客户端回响服务
NIO2从JDK1.7开始引入,NIO2允许你发起一个IO相关的操作,并提供一个CompletionHandler,该Handler会在前述的I/O操作完毕后自动调用。
NIO2与NIO的最重要区别是,NIO2将线程相关的逻辑交由底层代码处理(JDK自己处理事件循环),开发者不必检查通道中是否有事件发生(select)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
package cc.gmem.study.network.nio2; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import cc.gmem.study.network.basic.Helper; public class NIO2EchoServer { public static void main( String[] args ) throws IOException { final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress( Helper.DEFAULT_PORT ); serverChannel.bind( address ); //注册一个“接受”操作。开始接受客户端连接,一旦某个客户端被接受,即调用完成处理器 serverChannel.accept( null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed( final AsynchronousSocketChannel channel, Object attachment ) { //继续一个“接受”操作。等待下一个连接的到来 serverChannel.accept( null, this ); ByteBuffer buffer = ByteBuffer.allocate( 100 ); //此通道专用的字节缓冲 //注册一个“读”操作,一旦读取(读取的数据写到buffer中,第一个参数)完毕,即调用完成处理器 //注意:此处理器是一次性的,读完调用一次,然后即丢弃 channel.read( buffer, buffer, new EchoCompletionHandler( channel ) ); } @Override public void failed( Throwable throwable, Object attachment ) { try { //如果出错,关闭套接字 serverChannel.close(); } catch ( IOException e ) { } } } ); } private static final class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private final AsynchronousSocketChannel channel; EchoCompletionHandler( AsynchronousSocketChannel channel ) { this.channel = channel; } /** * 该类是读的完成处理器 * @param buffer 读取到的数据 */ @Override public void completed( Integer result, ByteBuffer buffer ) { buffer.flip();//切换到读模式 //注册一个写操作,一旦写入(待写入的数据从buffer中读取)完毕,即调用完成处理器 channel.write( buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed( Integer result, ByteBuffer buffer ) { if ( buffer.hasRemaining() ) { //如果缓冲中仍然有数据,再次注册写操作,继续写 channel.write( buffer, buffer, this ); } else { buffer.compact(); //否则,注册读操作,继续读取 channel.read( buffer, buffer, EchoCompletionHandler.this ); } } @Override public void failed( Throwable exc, ByteBuffer attachment ) { try { channel.close(); } catch ( IOException e ) { } } } ); } @Override public void failed( Throwable exc, ByteBuffer attachment ) { try { channel.close(); } catch ( IOException e ) { } } } } |
基于框架的编程
基于Netty 5.0的回响服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
package cc.gmem.study.network.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.ReferenceCountUtil; import java.nio.charset.Charset; import cc.gmem.study.network.basic.Helper; public class NettyEchoServer { //继承一个缺省失配,只覆盖必要的方法 //注意:该类的实例会为每个通道分别创建一个,因此可以线程安全的使用 //Netty 4.x的ChannelInboundHandler/ChannelOutboundHandler现在被合并了 public static class NettyEchoServerHandler extends ChannelHandlerAdapter { private boolean closeAfterEcho; /** * 每当从客户端接收到新的数据时,该回调方法被执行 */ @Override public void channelRead( final ChannelHandlerContext ctx, Object msg ) { try { //ByteBuf类似于NIO的Buffer,它是一种基于引用计数(reference-counted)的对象 //应当被明确的释放 ByteBuf buf = ( (ByteBuf) msg ); while ( buf.isReadable() ) { //如果缓冲中还有数据可读,则逐字节的读取并显示 System.out.print( (char) buf.readByte() ); System.out.flush(); } ChannelFuture cf = ctx.write( msg ); //将接收到的消息逐字的写回去 ctx.flush(); //flush方法导致刷空写缓冲,导致数据立即通过网络发送到客户端 if ( closeAfterEcho ) { //对于任何Future对象,均可以添加监听器回调 //回调会在Future对象Done时调用 cf.addListener( new ChannelFutureListener() { public void operationComplete( ChannelFuture future ) { ctx.close(); } } ); } } finally { ReferenceCountUtil.release( msg ); } } /** * 当发生I/O错误,或者Handler在处理I/O事件时抛出异常,Netty会调用此回调 */ @Override public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) { cause.printStackTrace(); //一般,通道都应该被关闭;可选的,可能在像客户端发送一个消息后,再关闭 ctx.close(); } } public static void main( String[] args ) throws InterruptedException { //事件循环组包含若干线程,用来处理I/O事件 //boss组用来接受(Accept)连接,一旦它接受连接,就将其注册给worker EventLoopGroup bossGroup = new NioEventLoopGroup(); //worker组用来处理已经接受的连接 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //ServerBootstrap是一个NIO服务的辅助启动类 ServerBootstrap b = new ServerBootstrap(); b.group( bossGroup, workerGroup ) //需要创建的通道类,这里的通道概念与NIO类似,但接口不兼容 //NioServerSocketChannel用于接受新的入站连接(SocketChannel) .channel( NioServerSocketChannel.class ) //对于所有的被接受的SocketChannel,此childHandler都会对其进行处理 //ChannelInitializer是一个特殊的通道处理器(ChannelHandler) //用于协助用户完成对新SocketChannel的配置 .childHandler( new ChannelInitializer() { @Override public void initChannel( SocketChannel ch ) throws Exception { //ChannelInitializer的最常见用法:为新的SocketChannel的通道管线(ChannelPipeline)添加ChannelHandler,以实现业务逻辑 //如果业务比较复杂,可能在管线中添加多个ChannelHandler //例如,这里添加了三个额外的处理器 ChannelPipeline pipeline = ch.pipeline(); //基于一个或者多个界定符,来分隔收到的ByteBuf pipeline.addLast( "framer", new DelimiterBasedFrameDecoder( Integer.MAX_VALUE, Delimiters.nulDelimiter() ) ); Charset charset = Charset.forName( "UTF-8" ); //基于UTF-8进行编解码 pipeline.addLast( "decoder", new StringDecoder( charset ) ); pipeline.addLast( "encoder", new StringEncoder( charset ) ); pipeline.addLast( "handler", new NettyEchoServerHandler() ); } } ) //在这里,可以指定与特定通道实现相关的参数,例如这里设置了与TCP相关的参数 .option( ChannelOption.SO_BACKLOG, 128 ) .childOption( ChannelOption.SO_KEEPALIVE, true ); //绑定到所有网络接口的1918端口 ChannelFuture f = b.bind( Helper.DEFAULT_PORT ).sync(); //等待Future完成 f.channel().closeFuture().sync(); } finally { //关闭线程组被回收资源 workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } } |
基于MINA 2.0的回响服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
package cc.gmem.study.network.mina; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.util.NoopFilter; import org.apache.mina.transport.socket.SocketAcceptor; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import cc.gmem.study.network.basic.Helper; public class MinaEchoServer { private final static Logger LOGGER = LoggerFactory.getLogger( EchoServer.class ); public static void main( String[] args ) throws IOException { SocketAcceptor acceptor = new NioSocketAcceptor(); acceptor.setReuseAddress( true ); //处理器包含一系列的回调接口,覆盖连接生命周期的各种事件 acceptor.setHandler( new IoHandlerAdapter() { /** * 当连接建立时,I/O处理器线程执行的操作 */ @Override public void sessionCreated( IoSession session ) { //设置连接空闲时间 session.getConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 ); } /** * 当连接关闭时执行的操作 */ @Override public void sessionClosed( IoSession session ) throws Exception { LOGGER.debug( "Session closed." ); } /** * 与sessionCreated之后被执行,不由I/O处理器调用 */ @Override public void sessionOpened( IoSession session ) throws Exception { LOGGER.debug( "Session opened." ); } /** * 连接空闲后执行操作 */ @Override public void sessionIdle( IoSession session, IdleStatus status ) { LOGGER.debug( "Session idle: {}", status ); } /** * 出现异常时执行的操作 */ @Override public void exceptionCaught( IoSession session, Throwable cause ) { session.close( true ); } /** * 接受到消息时执行的操作 */ @Override public void messageReceived( IoSession session, Object message ) throws Exception { LOGGER.info( "Received message : {}", message ); //缓冲对象 IoBuffer buf = (IoBuffer) message; session.write( buf.duplicate() ); } } ); DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); //过滤器拦截IoHandler事件,类似于ServletFilter,可以用于完成多种功能,例如身份验证、日志记录、负载控制 chain.addLast( "filter", new NoopFilter() ); acceptor.bind( new InetSocketAddress( Helper.DEFAULT_PORT ) ); } } |
Leave a Reply