
承接之前的文章:深入理解 tornado 之 底层 ioloop 实现(二)
ioloop 最核心的部分:
def start(self): if self._running: # 判断是否已经运行 raise RuntimeError("IOLoop is already running") self._setup_logging() if self._stopped: self._stopped = False # 设置停止为假 return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._thread_ident = thread.get_ident() # 获得当前线程标识符 self._running = True # 设置运行 old_wakeup_fd = None if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix': try: old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) if old_wakeup_fd != -1: signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None except ValueError: old_wakeup_fd = None try: while True: # 服务器进程正式开始,类似于其他服务器的 serve_forever with self._callback_lock: # 加锁,_callbacks 做为临界区不加锁进行读写会产生脏数据 callbacks = self._callbacks # 读取 _callbacks self._callbacks = []. # 清空 _callbacks due_timeouts = [] # 用于存放这个周期内已过期( 已超时 )的任务 if self._timeouts: # 判断 _timeouts 里是否有数据 now = self.time() # 获取当前时间,用来判断 _timeouts 里的任务有没有超时 while self._timeouts: # _timeouts 有数据时一直循环, _timeouts 是个最小堆,第一个数据永远是最小的, 这里第一个数据永远是最接近超时或已超时的 if self._timeouts[0].callback is None: # 超时任务无回调 heapq.heappop(self._timeouts) # 直接弹出 self._cancellations -= 1 # 超时计数器 - 1 elif self._timeouts[0].deadline <= now: # 判断最小的数据是否超时 due_timeouts.append(heapq.heappop(self._timeouts)) # 超时就加到已超时列表里。 else: break # 因为最小堆,如果没超时就直接退出循环( 后面的数据必定未超时 ) if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): # 当超时计数器大于 512 并且 大于 _timeouts 长度一半( >> 为右移运算, 相当于十进制数据被除 2 )时,清零计数器,并剔除 _timeouts 中无 callbacks 的任务 self._cancellatiOns= 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) # 进行 _timeouts 最小堆化 for callback in callbacks: self._run_callback(callback) # 运行 callbacks 里所有的 calllback for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # 运行所有已过期任务的 callback callbacks = callback = due_timeouts = timeout = None # 释放内存 if self._callbacks: # _callbacks 里有数据时 poll_timeout = 0.0 # 设置 epoll_wait 时间为 0 ( 立即返回 ) elif self._timeouts: # _timeouts 里有数据时 poll_timeout = self._timeouts[0].deadline - self.time() # 取最小过期时间当 epoll_wait 等待时间,这样当第一个任务过期时立即返回 poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) # 如果最小过期时间大于默认等待时间 _POLL_TIMEOUT = 3600 ,则用 3600 ,如果最小过期时间小于 0 就设置为 0 立即返回。 else: poll_timeout = _POLL_TIMEOUT # 默认 3600 s 等待时间 if not self._running: # 检查是否有系统信号中断运行,有则中断,无则继续 break if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) # 开始 epoll_wait 之前确保 signal alarm 都被清空( 这样在 epoll_wait 过程中不会被 signal alarm 打断 ) try: event_pairs = self._impl.poll(poll_timeout) # 获取返回的活跃事件队 except Exception as e: if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blockng_signal_threshold, 0) # epoll_wait 结束, 再设置 signal alarm self._events.update(event_pairs) # 将活跃事件加入 _events while self._events: fd, events = self._events.popitem() # 循环弹出事件 try: fd_obj, handler_func = self._handlers[fd] # 处理事件 handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: self._stopped = False # 确保发生异常也继续运行 if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) # 清空 signal alarm IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) # 和 start 开头部分对应,但是不是很清楚作用,求老司机带带路 最后来看 stop:
def stop(self): self._running = False self._stopped = True self._waker.wake() 这个很简单,设置判断条件,然后调用 self._waker.wake() 向 pipe 写入随意字符释放 pipe 。 over !
噗,写了这么长,终于写完了。 经过分析,我们可以看到, ioloop 实际上是对 epoll 的封装,并加入了一些对上层事件的处理和 server 相关的底层处理。
最后,感谢大家不辞辛苦看到这,文中理解有误的地方还请多多指教!:pray:
作者:rapospectre
1 micyng 2016 年 6 月 7 日 最后一点写错了,不是释放 pipe ,而是利用 pipe 的 fd 唤醒 ioloop 事件循环 |
2 rapospectre OP @micyng 谢谢指正!已经修改 |