深入理解 tornado 之 底层 ioloop 实现(三) - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
rapospectre
V2EX    Python

深入理解 tornado 之 底层 ioloop 实现(三)

  •  
  •   rapospectre
    bluedazzle 2016 年 6 月 7 日 4091 次点击
    这是一个创建于 3505 天前的主题,其中的信息可能已经有所发展或是发生改变。

    承接之前的文章:深入理解 tornado 之 底层 ioloop 实现(二)

    start

    ioloop 最核心的部分:

    def start(self): if self._running: # 判断是否已经运行 raise RuntimeError("IOLoop is already running") self._setup_logging() if self._stopped: self._stopped = False # 设置停止为假 return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._thread_ident = thread.get_ident() # 获得当前线程标识符 self._running = True # 设置运行 old_wakeup_fd = None if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': try: old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) if old_wakeup_fd != -1: signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None except ValueError: old_wakeup_fd = None try: while True: # 服务器进程正式开始,类似于其他服务器的 serve_forever with self._callback_lock: # 加锁,_callbacks 做为临界区不加锁进行读写会产生脏数据 callbacks = self._callbacks # 读取 _callbacks self._callbacks = []. # 清空 _callbacks due_timeouts = [] # 用于存放这个周期内已过期( 已超时 )的任务 if self._timeouts: # 判断 _timeouts 里是否有数据 now = self.time() # 获取当前时间,用来判断 _timeouts 里的任务有没有超时 while self._timeouts: # _timeouts 有数据时一直循环, _timeouts 是个最小堆,第一个数据永远是最小的, 这里第一个数据永远是最接近超时或已超时的 if self._timeouts[0].callback is None: # 超时任务无回调 heapq.heappop(self._timeouts) # 直接弹出 self._cancellations -= 1 # 超时计数器 - 1 elif self._timeouts[0].deadline <= now: # 判断最小的数据是否超时 due_timeouts.append(heapq.heappop(self._timeouts)) # 超时就加到已超时列表里。 else: break # 因为最小堆,如果没超时就直接退出循环( 后面的数据必定未超时 ) if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): # 当超时计数器大于 512 并且 大于 _timeouts 长度一半( >> 为右移运算, 相当于十进制数据被除 2 )时,清零计数器,并剔除 _timeouts 中无 callbacks 的任务 self._cancellatiOns= 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) # 进行 _timeouts 最小堆化 for callback in callbacks: self._run_callback(callback) # 运行 callbacks 里所有的 calllback for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # 运行所有已过期任务的 callback callbacks = callback = due_timeouts = timeout = None # 释放内存 if self._callbacks: # _callbacks 里有数据时 poll_timeout = 0.0 # 设置 epoll_wait 时间为 0 ( 立即返回 ) elif self._timeouts: # _timeouts 里有数据时 poll_timeout = self._timeouts[0].deadline - self.time() # 取最小过期时间当 epoll_wait 等待时间,这样当第一个任务过期时立即返回 poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) # 如果最小过期时间大于默认等待时间 _POLL_TIMEOUT = 3600 ,则用 3600 ,如果最小过期时间小于 0 就设置为 0 立即返回。 else: poll_timeout = _POLL_TIMEOUT # 默认 3600 s 等待时间 if not self._running: # 检查是否有系统信号中断运行,有则中断,无则继续 break if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) # 开始 epoll_wait 之前确保 signal alarm 都被清空( 这样在 epoll_wait 过程中不会被 signal alarm 打断 ) try: event_pairs = self._impl.poll(poll_timeout) # 获取返回的活跃事件队 except Exception as e: if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blockng_signal_threshold, 0) # epoll_wait 结束, 再设置 signal alarm self._events.update(event_pairs) # 将活跃事件加入 _events while self._events: fd, events = self._events.popitem() # 循环弹出事件 try: fd_obj, handler_func = self._handlers[fd] # 处理事件 handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: self._stopped = False # 确保发生异常也继续运行 if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) # 清空 signal alarm IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) # 和 start 开头部分对应,但是不是很清楚作用,求老司机带带路 

    最后来看 stop:

    stop

    def stop(self): self._running = False self._stopped = True self._waker.wake() 

    这个很简单,设置判断条件,然后调用 self._waker.wake() 向 pipe 写入随意字符释放 pipe 。 over !

    总结

    噗,写了这么长,终于写完了。 经过分析,我们可以看到, ioloop 实际上是对 epoll 的封装,并加入了一些对上层事件的处理和 server 相关的底层处理。

    最后,感谢大家不辞辛苦看到这,文中理解有误的地方还请多多指教!:pray:

    原文地址

    作者:rapospectre

    2 条回复    2016-06-08 09:27:03 +08:00
    micyng
        1
    micyng  
       2016 年 6 月 7 日
    最后一点写错了,不是释放 pipe ,而是利用 pipe 的 fd 唤醒 ioloop 事件循环
    rapospectre
        2
    rapospectre  
    OP
       2016 年 6 月 8 日
    @micyng 谢谢指正!已经修改
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2862 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 26ms UTC 14:35 PVG 22:35 LAX 06:35 JFK 09:35
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86