Asyncio 原理分析
非同步io的本質是對系統的基本的非同步io函式epoll, selector的封裝。 但凡非同步必將涉及事件-回撥,而裸的事件回撥會使程式碼複雜,比如又要考慮出現異常怎麼辦,回撥中套回撥, 回撥間資料怎麼共享,怎麼判斷誰觸發回撥都很麻煩。
asyncio這個庫就是使用python的yield這個可以打斷儲存當前函式的上下文的機制, 封裝好了selector 擺脫掉了複雜的回撥關係
本篇主要是對參考資料那篇文章的一個筆記 ,原文章很精彩,建議大家看一下
原始的selector
class Client:
def __init__(self):
self。sock = socket。socket(socket。AF_INET, socket。SOCK_STREAM)
self。resp = b‘’
def connect(self):
self。sock。setblocking(False)
try:
self。sock。connect((“127。0。0。1”, 5003))
print(“sock connection”)
except BlockingIOError:
pass
selector。register(self。sock。fileno(), selectors。EVENT_WRITE, self。write)
def read(self, key, mask):
chunk = self。sock。recv(4096)
if chunk:
self。resp = self。resp+chunk
else:
selector。unregister(key) # 這個放著裡是一次拿4k資料 如果沒拿完就一直監聽這個事件
print(self。resp)
return
def write(self, key, mask):
print(“start write”)
selector。unregister(key)
self。sock。send(b“hello”)
print(“finish write”)
selector。register(key, selectors。EVENT_READ, self。read)
if __name__ == ‘__main__’:
c = Client()
c。connect()
while True:
events = selector。select()
for key, mask in events:
callback = key。data
callback(key。fd, mask)
基本思路就是事件-回撥的思路, 先
register
註冊一個寫事件,等能觸發了用
unregister
標誌認領了事件
使用生成器
Future
我們先設計一個Future 表達日後要完成的事情。
class Future:
def __init__(self):
self。result = None
self。_callbacks = []
def add_done_callback(self, fn):
self。_callbacks。append(fn)
def set_result(self, result):
self。result = result
for fn in self。_callbacks:
fn(self)
每次執行set_result會呼叫call_back
Task
Task是用來驅動future的
class Task:
def __init__(self, coro):
self。coro = coro
f = Future()
f。set_result(None)
self。step(f)
def step(self, future):
try:
next_future = self。coro。send(future。result)
except StopIteration:
return
next_future。add_done_callback(self。step)
Task 要做的事情就是每次讓協程接著往下走,並且在不是最後一步的時候,把下一步加到回撥裡
Main
def connect(self):
self。sock。setblocking(False)
try:
self。sock。connect((‘127。0。0。1’,30))
except BlockingIOError:
pass
f = Future()
def on_connection():
f。result(None)
selector。register(self。sock。fileno(), selectors。EVENT_WRITE, on_connection)
yield f
selector。unregister(self。sock。fileno())
self。sock。sendall(b“hello”)
while True:
f = Future()
def on_read():
f。set_result(self。sock。recv(4096))
selector。register(self。sock。fileno(), selectors。EVENT_READ, on_read)
chunk = yield f
selector。unregister(self。sock。fileno())
if chunk:
self。resp += chunk
else:
break
這裡可以看到 所有的回撥都加上了 set_result 方法 透過set_result 來呼叫繫結在future的回撥,主要就是Task給 繫結的下一步的回撥。
這樣的好處就是不用區分事件是誰觸發的了 每次都按的是同步的順序走的。
Eventloop
c = Client()
Task(c。connect())
while True:
events = selector。select()
for key, mask in events:
callback = key。data
callback()
最後寫出Eventloop是一件十分自然的事情, 先把任務加到Task裡預激勵它。然後每次觸發事件就呼叫set_result方法來呼叫Task給他 附加的step的回撥,讓協程一直往下走。這樣就用可以讓程式碼保持了同步的順序又能非同步執行了。
參考資料
【0】。深入理解python非同步程式設計(上)