博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
tornado源码分析(二): ioloop
阅读量:4497 次
发布时间:2019-06-08

本文共 5462 字,大约阅读时间需要 18 分钟。

这篇文章来写tornado的ioloop,由于看的源码是1.0 可能和现在的版本有一些出入
在读源码之前需要有一些socket、tcp/ip和http协议的概念。
ioloop是tornado的核心主要实现了事件与回调函数的循环
说到事件就不得不说epoll事件模型,当初读源码的时候这部分看了很长时间。
首先说说epoll:
    epoll的定义: epoll是Linux内核为处理大批量句柄而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。现在十分火的nginx服务器就是用epoll作为事件模型。
    对于tornado的学习有一个概念非常重要,就是事件的循环。类似于js的工作方式(个人理解), 一般会有一个主线程来作为事件循环,当有事件发生时,可能并不会马上做出相应的处理,而是在下一个循环中去做相应的处理。
 
下面是ioloop的几个主要函数
 
大概的意思根据名字就能看出来, 其中add_handler, update_handler, remove_handler文件描述符和对应的回调函数
最开始几行是静态变量,定义了一些常量,这里面epoll有两种模式LT和ET,其中tornado使用LT模式
主要的两个方法是__init__初始化和start函数
 
初始化函数:
# 开始事件循环, 这里面因为EPOLL需要绑定文件描述符, 使用管道来代替. # 之所以新创建一个管道来绑定epoll事件, 是因为tornado有一个事件循环, 所以使用管道来触发epoll是可以快速响应的, 而不是等待0.2秒的循环# epoll是当对文件描述符进行读写的时候才会触发对应的事件, # 只对事件进行modify(epoll_ctl的绑定)时触发一次事件(待验证)def __init__(self, impl=None):        self._impl = impl or _poll()        if hasattr(self._impl, 'fileno'):            self._set_close_exec(self._impl.fileno())        self._handlers = {}        self._events = {}        self._callbacks = set()        self._timeouts = []        self._running = False        self._stopped = False        self._blocking_log_threshold = None        # Create a pipe that we send bogus data to when we want to wake        # the I/O loop when it is idle        if os.name != 'nt':            r, w = os.pipe()            self._set_nonblocking(r)            self._set_nonblocking(w)            self._set_close_exec(r)            self._set_close_exec(w)            self._waker_reader = os.fdopen(r, "r", 0)            self._waker_writer = os.fdopen(w, "w", 0)        else:            self._waker_reader = self._waker_writer = win32_support.Pipe()            r = self._waker_writer.reader_fd        #注册self._read_waker方法, 文件描述为新建管道的读标志符, 事件为epoll_read        #当向新创建的管道写入字符时, 就可以触发epoll事件, 然后_read_waker将写入的字符读取出来        #目的是为了添加下一个循环的方法时快速触发epoll事件来执行下一个循环        self.add_handler(r, self._read_waker, self.READ)
start函数中主要的事件循环, 逻辑还是比较清晰的:
while True:            # Never use an infinite timeout here - it can stall epoll            poll_timeout = 0.2            # Prevent IO event starvation by delaying new callbacks            # to the next iteration of the event loop.            callbacks = list(self._callbacks)            for callback in callbacks:                # A callback can add or remove other callbacks                if callback in self._callbacks:                    self._callbacks.remove(callback)                    self._run_callback(callback)            if self._callbacks:                poll_timeout = 0.0            #timeout回调函数, 使用最小堆            if self._timeouts:                now = time.time()                while self._timeouts and self._timeouts[0].deadline <= now:                    timeout = self._timeouts.pop(0)                    self._run_callback(timeout.callback)                if self._timeouts:                    milliseconds = self._timeouts[0].deadline - now                    poll_timeout = min(milliseconds, poll_timeout)            if not self._running:                break            if self._blocking_log_threshold is not None:                # clear alarm so it doesn't fire while poll is waiting for                # events.                signal.setitimer(signal.ITIMER_REAL, 0, 0)            #获取epoll的触发事件, 如果有callback则不设置过期时间,            #如果没有callback则设置过期时间为0.2秒, 开始下一个循环            #poll函数返回文件描述符和对应的事件            try:                event_pairs = self._impl.poll(poll_timeout)            except Exception, e:                # Depending on python version and IOLoop implementation,                # different exception types may be thrown and there are                # two ways EINTR might be signaled:                # * e.errno == errno.EINTR                # * e.args is like (errno.EINTR, 'Interrupted system call')                if (getattr(e, 'errno') == errno.EINTR or                    (isinstance(getattr(e, 'args'), tuple) and                     len(e.args) == 2 and e.args[0] == errno.EINTR)):                    logging.warning("Interrupted system call", exc_info=1)                    continue                else:                    raise            if self._blocking_log_threshold is not None:                signal.setitimer(signal.ITIMER_REAL,                                 self._blocking_log_threshold, 0)            # Pop one fd at a time from the set of pending fds and run            # its handler. Since that handler may perform actions on            # other file descriptors, there may be reentrant calls to            # this IOLoop that update self._events            self._events.update(event_pairs)            while self._events:                fd, events = self._events.popitem()                try:                    self._handlers[fd](fd, events)                except (KeyboardInterrupt, SystemExit):                    raise                except (OSError, IOError), e:                    if e[0] == errno.EPIPE:                        # Happens when the client closes the connection                        pass                    else:                        logging.error("Exception in I/O handler for fd %d",                                      fd, exc_info=True)                except:                    logging.error("Exception in I/O handler for fd %d",                                  fd, exc_info=True)
当poll之后调用之前注册的文件描述符的回调函数即可
这里我对signal这部分是做什么的不是很理解, 希望有知道的可以给我讲一下

转载于:https://www.cnblogs.com/nobuta/archive/2013/04/17/3027361.html

你可能感兴趣的文章
Struts 2基础知识
查看>>
SQL语法
查看>>
适配器模式(默认适配器)
查看>>
Nginx 配置简述
查看>>
NPOI 导入excel
查看>>
字符测试与映射函数 ctype.h
查看>>
GET请求和POST请求的区别
查看>>
Android的按钮四种点击事件
查看>>
nodejs 不是单线程
查看>>
MacOS使用zsh & oh-my-zsh
查看>>
Java 8
查看>>
Javascript异步机制
查看>>
无序数组排序后的最大相邻差值
查看>>
CSS——img标签消除3px
查看>>
如何得到yum的rpm包
查看>>
Swift 设置导航栏透明
查看>>
机器学习的一些常用算法
查看>>
蘑菇街基于Docker的私有云实践
查看>>
堆和优先队列
查看>>
宽度优先搜索
查看>>