Python网络编程
TCP编程代码示例
单线程Echo服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
from socket import * # @UnusedWildImport DEFAULT_PORT = 1918 if __name__ == '__main__': # 创建基于IPv4的TCP套接字对象 s = socket( AF_INET, SOCK_STREAM ) # 绑定到通配符地址的1918端口 s.bind( ( '0.0.0.0', DEFAULT_PORT ) ) logging.debug( 'Echo server is listening on port %d', DEFAULT_PORT ) # 开始监听,最大排队数量(backlog)为10 s.listen( 10 ) while True: # 接受一个客户端连接请求,返回套接字对象和地址的元组 client, addr = s.accept() logging.debug( '%s connected', addr ) msg = client.recv( 1024 ) logging.debug( 'Received message : %s', msg ) client.send( msg ) client.close() |
以下是客户端代码:
1 2 3 4 5 6 7 8 9 10 |
from socket import * # @UnusedWildImport DEFAULT_PORT = 1918 if __name__ == '__main__': # 创建基于IPv4的TCP套接字对象 s = socket( AF_INET, SOCK_STREAM ) # 连接到服务器端 s.connect( ( '127.0.0.1', DEFAULT_PORT ) ) s.send( 'Hello Server!' ) logging.debug( 'Echo from server: %s', s.recv( 1024 ) ) |
基于asyncore模块的异步Echo服务
asyncore模块将网络活动抽象为事件,由事件循环分派出去进行异步处理。事件循环通过select()或者poll()系统调用构建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# 主分发器,关联服务器端监听套接字 class EchoSocketDispatcher( asyncore.dispatcher ): def __init__( self, port ): asyncore.dispatcher.__init__( self ) # 创建当前分发器关联的套接字对象 self.create_socket( socket.AF_INET, socket.SOCK_STREAM ) self.bind( ( '0.0.0.0', port ) ) self.listen( 1024 ) def handle_accept( self ): client, addr = self.accept() logging.debug( 'Accepted connection from %s', addr ) return EchoDispatcher( client ) # 子分发器,处理单个客户端连接套接字 class EchoDispatcher( asyncore.dispatcher ): def __init__( self, client ): asyncore.dispatcher.__init__( self, client ) self.chunk = None # 何时允许读 def readable( self ): logging.debug( 'HH' ) return True # 何时允许写 def writable( self ): return self.chunk != None # 处理读取 def handle_read( self ): self.chunk = self.recv( 8192 ) logging.debug( 'Received message: %s', self.chunk ) # 处理写入 def handle_write( self ): self.send( self.chunk ) self.chunk = None def handle_close( self ): logging.debug( 'Connection closed by peer.' ) asyncore.dispatcher.handle_close( self ) DEFAULT_PORT = 1918 if __name__ == '__main__': dispatcher = EchoSocketDispatcher( DEFAULT_PORT ) logging.debug( 'Start polling on %d', DEFAULT_PORT ) # 持续执行轮询 asyncore.loop( use_poll=True, timeout=10 ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# Tasklet、SystemCall、Scheduler模拟了一个微型的操作系统 # 模拟进程 class Tasklet( object ): def __init__( self, target ): self.target = target # 当前目标协程 self.sendval = None # 协程恢复时发送的值 self.stack = [] # 历史协程的栈,对方法调用机制的一种模拟 def run( self ): try: # 执行协程到下一次退出,并获取协程的返回值 result = self.target.send( self.sendval ) if isinstance( result, SystemCall ): # 如果是一个“系统调用”包装对象,栈状态不变,类似于中断的效果 return result # 当前目标将期望此系统调用的结果被发送给它,以继续执行 elif isinstance( result, types.GeneratorType ): # 如果结果是一个生成器对象实例,相当于调用一个新方法,需要将当前目标压栈 self.stack.append( self.target ) self.sendval = None self.target = result else: # 如果结果不是一个生成器对象实例,相当于新方法调用返回,需要废弃当前目标,并弹出栈顶作为新目标 if not self.stack : return self.sendval = result self.target = self.stack.pop() except StopIteration: # 当前协程已经终止,需要移除,并弹出栈顶作为新的目标 if not self.stack: raise self.sendval = None self.target = self.stack.pop() # 模拟系统调用 class SystemCall( object ): def handle( self, sched, task ): pass # 读写“系统调用”实现 class ReadWait( SystemCall ): def __init__( self, f ): self.file = f def handle( self, sched, task ): fd = self.file.fileno() sched.readwait( task, fd ) class WriteWait( SystemCall ): def __init__( self, f ): self.file = f def handle( self, sched, task ): fd = self.file.fileno() sched.writewait( task, fd ) # 调度程序,相当于操作系统的调度例程 class Scheduler( object ): def __init__( self ): # 这个队列相当于操作系统的进程集 self.task_queue = collections.deque() self.read_waiting = {} self.write_waiting = {} self.taskcount = 0 def new( self, target ): newtask = Tasklet( target ) self.schedule( newtask ) self.taskcount += 1 def schedule( self, task ): self.task_queue.append( task ) def readwait( self, task, fd ): self.read_waiting[fd] = task def writewait( self, task, fd ): self.write_waiting[fd] = task def mainloop( self, count=-1, timeout=None ): while self.taskcount: # 如果有I/O事件的队列,那么先轮询I/O事件 if self.read_waiting or self.write_waiting: # 如果进程队列为空则等待时间为timeout,否则等待时间为0 # 队列为空的场景:没有创建进程;所有进程都在I/O等待集上 # timeout默认为一直等待,直到有可能的描述符 wait = 0 if self.task_queue else timeout # 查看一组文件描述符的输入、输出、异常状态。返回输入、输出、异常准备就绪的列表的元组 # 前三个参数是整数描述符的列表;或者带有fileno()方法的对象(该方法返回文件描述符) # wait不指定会一直等待直到至少一个文件描述符准备好为止,为0则指进行一次轮询即返回 r, w , e = select.select( self.read_waiting, self.write_waiting, [], wait ) # 将就绪的文件描述符从等待集中移除,加入到正常调度集中 for fd in r: self.schedule( self.read_waiting.pop( fd ) ) for fd in w: self.schedule( self.write_waiting.pop( fd ) ) # 逐个执行队列上的任务 while self.task_queue: # 取出一个任务,从进程列表中移除 task = self.task_queue.popleft() try: # 执行这个任务 result = task.run() if isinstance( result, SystemCall ): # 模拟系统调用陷入内核 result.handle( self, task ) else: # 其它的,要么相当于方法调用,要么相当于方法返回,继续调度 self.schedule( task ) except StopIteration : # 不需要再考虑此任务,其生命周期已经结束 self.taskcount -= 1 else: if count > 0: count -= 1 if count == 0: return # Echo服务器协程 from socket import socket, AF_INET, SOCK_STREAM def EchoServer( host, port , sched ): # 服务器监听套接字 s = socket( AF_INET, SOCK_STREAM ) s.bind( ( host , port ) ) logging.debug( 'EchoServer listening on %s:%d', host, port ) s.listen( 128 ) while True: # 等待服务器监听套接字可读 yield ReadWait( s ) conn, addr = s.accept() logging.debug( 'Client connected: %s', addr ) sched.new( EchoSocket( conn ) ) def EchoSocket( conn ): while True: # 等待套接字可读 yield ReadWait( conn ) chunk = conn.recv( 1024 ) if chunk: logging.debug( 'Received message: %s', chunk ) yield WriteWait( conn ) conn.send( chunk ) if __name__ == '__main__': sched = Scheduler() sched.new( EchoServer( '0.0.0.0', 1918, sched ) ) sched.mainloop( -1, None ) |
服务定位器模式 →
Leave a Reply