Menu

  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay
  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay

Python网络编程

9
May
2011

Python网络编程

By Alex
/ in Network,Python
/ tags Coroutine
0 Comments
TCP编程代码示例
单线程Echo服务
EchoServer.py
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from socket import *  # @UnusedWildImport
DEFAULT_PORT = 1918
if __name__ == '__main__':
    # 创建基于IPv4的TCP套接字对象
    s = socket( AF_INET, SOCK_STREAM )
    # 绑定到通配符地址的1918端口
    s.bind( ( '0.0.0.0', DEFAULT_PORT ) )
    logging.debug( 'Echo server is listening on port %d', DEFAULT_PORT )
    # 开始监听,最大排队数量(backlog)为10
    s.listen( 10 )
    
    while True:
        # 接受一个客户端连接请求,返回套接字对象和地址的元组
        client, addr = s.accept()
        logging.debug( '%s connected', addr )
        msg = client.recv( 1024 )
        logging.debug( 'Received message : %s', msg )
        client.send( msg )
        client.close()

以下是客户端代码:

EchoClient.py
Python
1
2
3
4
5
6
7
8
9
10
from socket import *  # @UnusedWildImport
 
DEFAULT_PORT = 1918
if __name__ == '__main__':
    # 创建基于IPv4的TCP套接字对象
    s = socket( AF_INET, SOCK_STREAM )
    # 连接到服务器端
    s.connect( ( '127.0.0.1', DEFAULT_PORT ) )
    s.send( 'Hello Server!' )
    logging.debug( 'Echo from server: %s', s.recv( 1024 ) )
基于asyncore模块的异步Echo服务

asyncore模块将网络活动抽象为事件,由事件循环分派出去进行异步处理。事件循环通过select()或者poll()系统调用构建。

AsyncEchoServer.py
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 主分发器,关联服务器端监听套接字
class EchoSocketDispatcher( asyncore.dispatcher ):
    def __init__( self, port ):
        asyncore.dispatcher.__init__( self )
        # 创建当前分发器关联的套接字对象
        self.create_socket( socket.AF_INET, socket.SOCK_STREAM )
        self.bind( ( '0.0.0.0', port ) )
        self.listen( 1024 )
    def handle_accept( self ):
        client, addr = self.accept()
        logging.debug( 'Accepted connection from %s', addr )
        return EchoDispatcher( client )
# 子分发器,处理单个客户端连接套接字    
class EchoDispatcher( asyncore.dispatcher ):
    
    def __init__( self, client ):
        asyncore.dispatcher.__init__( self, client )
        self.chunk = None
    # 何时允许读
    def readable( self ):
        logging.debug( 'HH' )
        return True
    # 何时允许写
    def writable( self ):
        return self.chunk != None
    # 处理读取
    def handle_read( self ):
        self.chunk = self.recv( 8192 )
        logging.debug( 'Received message: %s', self.chunk )
    # 处理写入
    def handle_write( self ):
        self.send( self.chunk )
        self.chunk = None
    def handle_close( self ):
        logging.debug( 'Connection closed by peer.' )
        asyncore.dispatcher.handle_close( self )
        
DEFAULT_PORT = 1918    
if __name__ == '__main__':
    dispatcher = EchoSocketDispatcher( DEFAULT_PORT )
    logging.debug( 'Start polling on %d', DEFAULT_PORT )
    # 持续执行轮询
    asyncore.loop( use_poll=True, timeout=10 )
基于协程技术实现的异步Echo服务
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Tasklet、SystemCall、Scheduler模拟了一个微型的操作系统
 
# 模拟进程
class Tasklet( object ):
    def __init__( self, target ):
        self.target = target  # 当前目标协程
        self.sendval = None  # 协程恢复时发送的值
        self.stack = []  # 历史协程的栈,对方法调用机制的一种模拟
    def run( self ):
        try:
            # 执行协程到下一次退出,并获取协程的返回值
            result = self.target.send( self.sendval )
            if isinstance( result, SystemCall ):
                # 如果是一个“系统调用”包装对象,栈状态不变,类似于中断的效果
                return result
                # 当前目标将期望此系统调用的结果被发送给它,以继续执行
            elif isinstance( result, types.GeneratorType ):
                # 如果结果是一个生成器对象实例,相当于调用一个新方法,需要将当前目标压栈
                self.stack.append( self.target )
                self.sendval = None
                self.target = result
            else:
                # 如果结果不是一个生成器对象实例,相当于新方法调用返回,需要废弃当前目标,并弹出栈顶作为新目标
                if not self.stack : return
                self.sendval = result
                self.target = self.stack.pop()
                
        except StopIteration:
            # 当前协程已经终止,需要移除,并弹出栈顶作为新的目标
            if not self.stack: raise
            self.sendval = None
            self.target = self.stack.pop()
 
# 模拟系统调用
class SystemCall( object ):
    def handle( self, sched, task ):
        pass
# 读写“系统调用”实现
class ReadWait( SystemCall ):
    def __init__( self, f ):
        self.file = f
    def handle( self, sched, task ):
        fd = self.file.fileno()
        sched.readwait( task, fd )
class WriteWait( SystemCall ):
    def __init__( self, f ):
        self.file = f
    def handle( self, sched, task ):
        fd = self.file.fileno()
        sched.writewait( task, fd )
 
# 调度程序,相当于操作系统的调度例程
class Scheduler( object ):
    def __init__( self ):
        # 这个队列相当于操作系统的进程集
        self.task_queue = collections.deque()
        self.read_waiting = {}
        self.write_waiting = {}
        self.taskcount = 0
    def new( self, target ):
        newtask = Tasklet( target )
        self.schedule( newtask )
        self.taskcount += 1
    def schedule( self, task ):
        self.task_queue.append( task )
    def readwait( self, task, fd ):
        self.read_waiting[fd] = task
    def writewait( self, task, fd ):
        self.write_waiting[fd] = task
    def mainloop( self, count=-1, timeout=None ):
        while self.taskcount:
            # 如果有I/O事件的队列,那么先轮询I/O事件
            if self.read_waiting or self.write_waiting:
                # 如果进程队列为空则等待时间为timeout,否则等待时间为0
                # 队列为空的场景:没有创建进程;所有进程都在I/O等待集上
                # timeout默认为一直等待,直到有可能的描述符
                wait = 0 if self.task_queue else timeout
                # 查看一组文件描述符的输入、输出、异常状态。返回输入、输出、异常准备就绪的列表的元组
                # 前三个参数是整数描述符的列表;或者带有fileno()方法的对象(该方法返回文件描述符)
                # wait不指定会一直等待直到至少一个文件描述符准备好为止,为0则指进行一次轮询即返回
                r, w , e = select.select( self.read_waiting, self.write_waiting, [], wait )
                # 将就绪的文件描述符从等待集中移除,加入到正常调度集中
                for fd in r:
                    self.schedule( self.read_waiting.pop( fd ) )
                for fd in w:
                    self.schedule( self.write_waiting.pop( fd ) )
            # 逐个执行队列上的任务
            while self.task_queue:
                # 取出一个任务,从进程列表中移除
                task = self.task_queue.popleft()
                try:
                    # 执行这个任务
                    result = task.run()
                    if isinstance( result, SystemCall ):
                        # 模拟系统调用陷入内核
                        result.handle( self, task )
                    else:
                        # 其它的,要么相当于方法调用,要么相当于方法返回,继续调度
                        self.schedule( task )
                except StopIteration :
                    # 不需要再考虑此任务,其生命周期已经结束
                    self.taskcount -= 1
            else:
                if count > 0: count -= 1
                if count == 0:
                    return
 
# Echo服务器协程
from socket import socket, AF_INET, SOCK_STREAM
def EchoServer( host, port , sched ):
    # 服务器监听套接字
    s = socket( AF_INET, SOCK_STREAM )
    s.bind( ( host , port ) )
    logging.debug( 'EchoServer listening on %s:%d', host, port )
    s.listen( 128 )
    while True:
        # 等待服务器监听套接字可读
        yield ReadWait( s )
        conn, addr = s.accept()
        logging.debug( 'Client connected: %s', addr )
        sched.new( EchoSocket( conn ) )
def EchoSocket( conn ):
    while True:
        # 等待套接字可读
        yield ReadWait( conn )
        chunk = conn.recv( 1024 )
        if chunk:
            logging.debug( 'Received message: %s', chunk )
            yield WriteWait( conn )
            conn.send( chunk )
            
 
if __name__ == '__main__':
    sched = Scheduler()
    sched.new( EchoServer( '0.0.0.0', 1918, sched ) )
    sched.mainloop( -1, None )
← Python并发编程
服务定位器模式 →

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">

Related Posts

  • Python学习笔记
  • Python并发编程
  • HTTP协议学习笔记
  • Linux网络知识集锦
  • 使用Python进行文本处理

Recent Posts

  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
  • A Comprehensive Study of Kotlin for Java Developers
  • 背诵营笔记
  • 利用LangChain和语言模型交互
  • 享学营笔记
ABOUT ME

汪震 | Alex Wong

江苏淮安人,现居北京。目前供职于腾讯云,专注容器方向。

GitHub:gmemcc

Git:git.gmem.cc

Email:gmemjunk@gmem.cc@me.com

ABOUT GMEM

绿色记忆是我的个人网站,域名gmem.cc中G是Green的简写,MEM是Memory的简写,CC则是我的小天使彩彩名字的简写。

我在这里记录自己的工作与生活,同时和大家分享一些编程方面的知识。

GMEM HISTORY
v2.00:微风
v1.03:单车旅行
v1.02:夏日版
v1.01:未完成
v0.10:彩虹天堂
v0.01:阳光海岸
MIRROR INFO
Meta
  • Log in
  • Entries RSS
  • Comments RSS
  • WordPress.org
Recent Posts
  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
    In this blog post, I will walk ...
  • A Comprehensive Study of Kotlin for Java Developers
    Introduction Purpose of the Study Understanding the Mo ...
  • 背诵营笔记
    Day 1 Find Your Greatness 原文 Greatness. It’s just ...
  • 利用LangChain和语言模型交互
    LangChain是什么 从名字上可以看出来,LangChain可以用来构建自然语言处理能力的链条。它是一个库 ...
  • 享学营笔记
    Unit 1 At home Lesson 1 In the ...
  • K8S集群跨云迁移
    要将K8S集群从一个云服务商迁移到另外一个,需要解决以下问题: 各种K8S资源的迁移 工作负载所挂载的数 ...
  • Terraform快速参考
    简介 Terraform用于实现基础设施即代码(infrastructure as code)—— 通过代码( ...
  • 草缸2021
    经过四个多月的努力,我的小小荷兰景到达极致了状态。

  • 编写Kubernetes风格的APIServer
    背景 前段时间接到一个需求做一个工具,工具将在K8S中运行。需求很适合用控制器模式实现,很自然的就基于kube ...
  • 记录一次KeyDB缓慢的定位过程
    环境说明 运行环境 这个问题出现在一套搭建在虚拟机上的Kubernetes 1.18集群上。集群有三个节点: ...
  • eBPF学习笔记
    简介 BPF,即Berkeley Packet Filter,是一个古老的网络封包过滤机制。它允许从用户空间注 ...
  • IPVS模式下ClusterIP泄露宿主机端口的问题
    问题 在一个启用了IPVS模式kube-proxy的K8S集群中,运行着一个Docker Registry服务 ...
  • 念爷爷
      今天是爷爷的头七,十二月七日、阴历十月廿三中午,老人家与世长辞。   九月初,回家看望刚动完手术的爸爸,发

  • 6 杨梅坑

  • liuhuashan
    深圳人才公园的网红景点 —— 流花山

  • 1 2020年10月拈花湾

  • 内核缺陷触发的NodePort服务63秒延迟问题
    现象 我们有一个新创建的TKE 1.3.0集群,使用基于Galaxy + Flannel(VXLAN模式)的容 ...
  • Galaxy学习笔记
    简介 Galaxy是TKEStack的一个网络组件,支持为TKE集群提供Overlay/Underlay容器网 ...
TOPLINKS
  • Zitahli's blue 91 people like this
  • 梦中的婚礼 64 people like this
  • 汪静好 61 people like this
  • 那年我一岁 36 people like this
  • 为了爱 28 people like this
  • 小绿彩 26 people like this
  • 彩虹姐姐的笑脸 24 people like this
  • 杨梅坑 6 people like this
  • 亚龙湾之旅 1 people like this
  • 汪昌博 people like this
  • 2013年11月香山 10 people like this
  • 2013年7月秦皇岛 6 people like this
  • 2013年6月蓟县盘山 5 people like this
  • 2013年2月梅花山 2 people like this
  • 2013年淮阴自贡迎春灯会 3 people like this
  • 2012年镇江金山游 1 people like this
  • 2012年徽杭古道 9 people like this
  • 2011年清明节后扬州行 1 people like this
  • 2008年十一云龙公园 5 people like this
  • 2008年之秋忆 7 people like this
  • 老照片 13 people like this
  • 火一样的六月 16 people like this
  • 发黄的相片 3 people like this
  • Cesium学习笔记 90 people like this
  • IntelliJ IDEA知识集锦 59 people like this
  • 基于Kurento搭建WebRTC服务器 38 people like this
  • Bazel学习笔记 37 people like this
  • PhoneGap学习笔记 32 people like this
  • NaCl学习笔记 32 people like this
  • 使用Oracle Java Mission Control监控JVM运行状态 29 people like this
  • Ceph学习笔记 27 people like this
  • 基于Calico的CNI 27 people like this
Tag Cloud
ActiveMQ AspectJ CDT Ceph Chrome CNI Command Cordova Coroutine CXF Cygwin DNS Docker eBPF Eclipse ExtJS F7 FAQ Groovy Hibernate HTTP IntelliJ IO编程 IPVS JacksonJSON JMS JSON JVM K8S kernel LB libvirt Linux知识 Linux编程 LOG Maven MinGW Mock Monitoring Multimedia MVC MySQL netfs Netty Nginx NIO Node.js NoSQL Oracle PDT PHP Redis RPC Scheduler ServiceMesh SNMP Spring SSL svn Tomcat TSDB Ubuntu WebGL WebRTC WebService WebSocket wxWidgets XDebug XML XPath XRM ZooKeeper 亚龙湾 单元测试 学习笔记 实时处理 并发编程 彩姐 性能剖析 性能调优 文本处理 新特性 架构模式 系统编程 网络编程 视频监控 设计模式 远程调试 配置文件 齐塔莉
Recent Comments
  • qg on Istio中的透明代理问题
  • heao on 基于本地gRPC的Go插件系统
  • 黄豆豆 on Ginkgo学习笔记
  • cloud on OpenStack学习笔记
  • 5dragoncon on Cilium学习笔记
  • Archeb on 重温iptables
  • C/C++编程:WebSocketpp(Linux + Clion + boostAsio) – 源码巴士 on 基于C/C++的WebSocket库
  • jerbin on eBPF学习笔记
  • point on Istio中的透明代理问题
  • G on Istio中的透明代理问题
  • 绿色记忆:Go语言单元测试和仿冒 on Ginkgo学习笔记
  • point on Istio中的透明代理问题
  • 【Maven】maven插件开发实战 – IT汇 on Maven插件开发
  • chenlx on eBPF学习笔记
  • Alex on eBPF学习笔记
  • CFC4N on eBPF学习笔记
  • 李运田 on 念爷爷
  • yongman on 记录一次KeyDB缓慢的定位过程
  • Alex on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • haolipeng on 基于本地gRPC的Go插件系统
  • 吴杰 on 基于C/C++的WebSocket库
©2005-2025 Gmem.cc | Powered by WordPress | 京ICP备18007345号-2