Asyncio 原理分析

非同步io的本質是對系統的基本的非同步io函式epoll, selector的封裝。 但凡非同步必將涉及事件-回撥,而裸的事件回撥會使程式碼複雜,比如又要考慮出現異常怎麼辦,回撥中套回撥, 回撥間資料怎麼共享,怎麼判斷誰觸發回撥都很麻煩。

asyncio這個庫就是使用python的yield這個可以打斷儲存當前函式的上下文的機制, 封裝好了selector 擺脫掉了複雜的回撥關係

本篇主要是對參考資料那篇文章的一個筆記 ,原文章很精彩,建議大家看一下

原始的selector

class Client:

def __init__(self):

self。sock = socket。socket(socket。AF_INET, socket。SOCK_STREAM)

self。resp = b‘’

def connect(self):

self。sock。setblocking(False)

try:

self。sock。connect((“127。0。0。1”, 5003))

print(“sock connection”)

except BlockingIOError:

pass

selector。register(self。sock。fileno(), selectors。EVENT_WRITE, self。write)

def read(self, key, mask):

chunk = self。sock。recv(4096)

if chunk:

self。resp = self。resp+chunk

else:

selector。unregister(key) # 這個放著裡是一次拿4k資料 如果沒拿完就一直監聽這個事件

print(self。resp)

return

def write(self, key, mask):

print(“start write”)

selector。unregister(key)

self。sock。send(b“hello”)

print(“finish write”)

selector。register(key, selectors。EVENT_READ, self。read)

if __name__ == ‘__main__’:

c = Client()

c。connect()

while True:

events = selector。select()

for key, mask in events:

callback = key。data

callback(key。fd, mask)

基本思路就是事件-回撥的思路, 先

register

註冊一個寫事件,等能觸發了用

unregister

標誌認領了事件

使用生成器

Future

我們先設計一個Future 表達日後要完成的事情。

class Future:

def __init__(self):

self。result = None

self。_callbacks = []

def add_done_callback(self, fn):

self。_callbacks。append(fn)

def set_result(self, result):

self。result = result

for fn in self。_callbacks:

fn(self)

每次執行set_result會呼叫call_back

Task

Task是用來驅動future的

class Task:

def __init__(self, coro):

self。coro = coro

f = Future()

f。set_result(None)

self。step(f)

def step(self, future):

try:

next_future = self。coro。send(future。result)

except StopIteration:

return

next_future。add_done_callback(self。step)

Task 要做的事情就是每次讓協程接著往下走,並且在不是最後一步的時候,把下一步加到回撥裡

Main

def connect(self):

self。sock。setblocking(False)

try:

self。sock。connect((‘127。0。0。1’,30))

except BlockingIOError:

pass

f = Future()

def on_connection():

f。result(None)

selector。register(self。sock。fileno(), selectors。EVENT_WRITE, on_connection)

yield f

selector。unregister(self。sock。fileno())

self。sock。sendall(b“hello”)

while True:

f = Future()

def on_read():

f。set_result(self。sock。recv(4096))

selector。register(self。sock。fileno(), selectors。EVENT_READ, on_read)

chunk = yield f

selector。unregister(self。sock。fileno())

if chunk:

self。resp += chunk

else:

break

這裡可以看到 所有的回撥都加上了 set_result 方法 透過set_result 來呼叫繫結在future的回撥,主要就是Task給 繫結的下一步的回撥。

這樣的好處就是不用區分事件是誰觸發的了 每次都按的是同步的順序走的。

Eventloop

c = Client()

Task(c。connect())

while True:

events = selector。select()

for key, mask in events:

callback = key。data

callback()

最後寫出Eventloop是一件十分自然的事情, 先把任務加到Task裡預激勵它。然後每次觸發事件就呼叫set_result方法來呼叫Task給他 附加的step的回撥,讓協程一直往下走。這樣就用可以讓程式碼保持了同步的順序又能非同步執行了。

參考資料

【0】。深入理解python非同步程式設計(上)