[Python] RPC实现,,单线程同步使用soc


单线程同步

使用socket传输数据使用json序列化消息体struct将消息编码为二进制字节串,进行网络传输

消息协议

技术图片
 1 // 输入 2 { 3     in: "ping", 4     params: "ireader 0" 5 } 6  7 // 输出 8 { 9     out: "pong",10     result: "ireader 0"11 }
View Code

客户端 client.py

技术图片
 1 # coding: utf-8 2 # client.py 3  4 import json 5 import time 6 import struct 7 import socket 8  9 10 def rpc(sock, in_, params):11     response = json.dumps({"in": in_, "params": params})  # 请求消息体12     length_prefix = struct.pack("I", len(response)) # 请求长度前缀13     sock.sendall(length_prefix)14     sock.sendall(response)15     length_prefix = sock.recv(4)  # 响应长度前缀16     length, = struct.unpack("I", length_prefix)17     body = sock.recv(length) # 响应消息体18     response = json.loads(body)19     return response["out"], response["result"]  # 返回响应类型和结果20 21 if __name__ == ‘__main__‘:22     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)23     s.connect(("localhost", 8080))24     for i in range(10): # 连续发送10个rpc请求25         out, result = rpc(s, "ping", "ireader %d" % i)26         print out, result27         time.sleep(1)  # 休眠1s,便于观察28     s.close() # 关闭连接
View Code

技术图片

服务端 blocking_single.py

技术图片
 1 # coding: utf8 2 # blocking_single.py 3  4 import json 5 import struct 6 import socket 7  8  9 def handle_conn(conn, addr, handlers):10     print addr, "comes"11     while True:  # 循环读写12         length_prefix = conn.recv(4)  # 请求长度前缀13         if not length_prefix:  # 连接关闭了14             print addr, "bye"15             conn.close()16             break  # 退出循环,处理下一个连接17         length, = struct.unpack("I", length_prefix)18         body = conn.recv(length)  # 请求消息体  19         request = json.loads(body)20         in_ = request[‘in‘]21         params = request[‘params‘]22         print in_, params23         handler = handlers[in_]  # 查找请求处理器24         handler(conn, params)  # 处理请求25 26 27 def loop(sock, handlers):28     while True:29         conn, addr = sock.accept()  # 接收连接30         handle_conn(conn, addr, handlers)  # 处理连接31 32 33 def ping(conn, params):34     send_result(conn, "pong", params)35 36 37 def send_result(conn, out, result):38     response = json.dumps({"out": out, "result": result})  # 响应消息体39     length_prefix = struct.pack("I", len(response))  # 响应长度前缀40     conn.sendall(length_prefix)41     conn.sendall(response)42 43 44 if __name__ == ‘__main__‘:45     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 创建一个TCP套接字46     sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 打开reuse addr选项47     sock.bind(("localhost", 8080)) # 绑定端口48     sock.listen(1)  # 监听客户端连接49     handlers = {  # 注册请求处理器50         "ping": ping51     }52     loop(sock, handlers)  # 进入服务循环
View Code

技术图片

多线程同步

使用线程库thread创建原生线程服务器可并行处理多个客户端

服务端 multithread.py

技术图片View Code

技术图片

多进程同步

Python的GIL导致单个进程只能占满一个CPU核心,多线程无法利用多核优势os.fork()会生成子进程子进程退出后,父进程需使用waitpid系统调用收割子进程,防止其称为僵尸资源在子进程中关闭服务器套接字后,在父进程中也要关闭服务器套接字因为进程fork后,父子进程都有自己的套接字引用指向内核的同一份套接字对象,套接字引用计数为2,对套接字进程close,即将套接字对象的引用计数减1

服务端 multiprocess.py

技术图片View Code

PreForking同步

进程比线程耗费资源,通过PreForking进程池模型对服务器开辟的进程数量进行限制,避免服务器负载过重如果并行的连接数量超过了prefork进程数量,后来的客户端请求将会阻塞

单进程异步

通过事件轮询API,查询相关套接字是否有响应的读写事件,有则携带事件列表返回,没有则阻塞拿到读写事件后,可对事件相关的套接字进行读写操作设置读写缓冲区Nginx/Nodejs/Redis都是基于异步模型异步模型编码成本高,易出错,通常在公司业务代码中采用同步模型,仅在讲究高并发高性能的场合才使用异步模型

PreForking异步

Tornado/Nginx采用了多进程PreForking异步模型,具有良好的高并发处理能力

技术图片

参考

Python多线程和多进程

https://www.cnblogs.com/yssjun/p/11302500.html

[Python] RPC实现

评论关闭