用 asyncio 做了一个 UDP 传输性能测试工具,目前单进程服务端性能不够,流量大的时候处理不过来,服务端用的 asyncio.DatagramProtocol,怎么变成多进程的呢?试试了一下抢占式的写法,运行报错了,运行起来也只有一个进程工作,上代码
import asyncio import time import socket from multiprocessing import Process loop = asyncio.get_event_loop() size = 0 class ServerProtocol(asyncio.DatagramProtocol): def connection_made(self, transport): self.transport = transport def datagram_received(self, data, addr, args=None): global size data = data.decode() message = data index = data.find('\n') if index > 0: filename = data[0:index] data = data[index+1::] size += len(data) task = self.WriteToFile(filename, data) asyncio.run_coroutine_threadsafe(task, loop) async def WriteToFile(self, f, data): await asyncio.sleep(1) return True async def print_size(): global size while True: await asyncio.sleep(1) print(size) def start(sock): listen = loop.create_datagram_endpoint( ServerProtocol, sock = sock ) transport, protocol = loop.run_until_complete(listen) task = print_size() asyncio.run_coroutine_threadsafe(task, loop) try: loop.run_forever() except KeyboardInterrupt: pass transport.close() loop.close() sock.close() if __name__ == '__main__': print("Starting UDP server") sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('0.0.0.0', 9873)) for i in range(1): t = Process(target=start, args=(sock,)) t.deamon = True t.start() start(sock)
报错信息
Exception in callback BaseSelectorEventLoop._add_reader(6, <bound method..., bufsize=0>>>) handle: <Handle BaseSelectorEventLoop._add_reader(6, <bound method..., bufsize=0>>>)> Traceback (most recent call last): File "/usr/local/python36/lib/python3.6/asyncio/selector_events.py", line 264, in _add_reader key = self._selector.get_key(fd) File "/usr/local/python36/lib/python3.6/selectors.py", line 191, in get_key raise KeyError("{!r} is not registered".format(fileobj)) from None KeyError: '6 is not registered' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/python36/lib/python3.6/asyncio/events.py", line 145, in _run self._callback(*self._args) File "/usr/local/python36/lib/python3.6/asyncio/selector_events.py", line 267, in _add_reader (handle, None)) File "/usr/local/python36/lib/python3.6/selectors.py", line 412, in register self._epoll.register(key.fd, epoll_events) FileExistsError: [Errno 17] File exists
1 lieh222 OP 这。。。咋没人呢 |
2 lolizeppelin 2018-08-07 09:00:04 +08:00 via Android 把 fork 搞明白 一多进城就 multiprocessing 有没有想过要看 multiprocessing 的源码? |
3 lieh222 OP @lolizeppelin 感谢回复,已经解决,因为多进程中共用了一个事件循环,add_reader 重复注册了 socket.fileno,所以报错了,用 new_event_loop 解决,与 fork、mutiprocessing 的用法和是否看了源码没有关系,另外我认为像 fork,mutiprocessing 这种系统接口函数和系统接口的高级封装直接用就是了,除非遇到相关必须要解决的问题和抱着学习的目的,没有必要看这种源码 |