MaDi's Blog

一個紀錄自己在轉職軟體工程師路上的學習小空間

0%

[Python] asyncio/aiohttp - 非同步程式設計

非同步適合用於io密集型的任務,舉凡是網路端的操作、爬蟲…等等。非同步指的是平行處理,無須一一等待每個任務完成才能依序進行。

以下某些內容整理自 iT邦幫忙 - python的asyncio模組 一系列的文章

同步v.s非同步

同步(Synchronous): 接到一個任務後,需要等到它完成,才能繼續執行下一個任務。

非同步(Asynchronous): 平行處理,無需等待第一個任務完成,即可執行其它的任務,只要第一個任務完成了,就回來處理。

非同步適合用在io密集型的任務,ex:網路端的操作、爬蟲…

asyncio模組

asyncio是非同步模組,使用asyncawait語法來達成非同步的效果。在函式前加上async關鍵字,來定義 協程(coroutine) ,在其中定義非同步的 任務清單(task) , 再透過 事件迴圈(Event Loop) 來進行不同任務間的切換執行,達到非同步的執行效果。

1
2
3
4
import asyncio
# jupyter notebook需要加以下兩行
import nest_asyncio
nest_asyncio.apply()
  1. 建立事件迴圈(event loop)

    1
    loop = asyncio.get_event_loop()
  2. 建立協程(coroutine)

    1
    2
    3
    4
    5
    6
    7
    async def example1(): 
    await listen_event_happend()
    當listen_event_happend()完成之後要做的事情...

    async def example2():
    await listen_event_happend()
    當listen_event_happend()完成之後要做的事情...

    協程(coroutine)函式與一般函式不同,協程指的是可以中途暫停、恢復運作的函式。

    協程可以很好的實現異步程式設計中的Event與Callback兩個概念,當我們要在一個協程(coroutine)裡中途監聽某一個Event發生後再執行後續行為時,只需要用await關鍵字來等待某個Event發生即可

    當程式遇到await關鍵字之後,會註冊一個Event:Callback到事件迴圈(Event Loop)裏頭,等待這個非同步的作業完成,才展開後續的動作

舉例來說,假設今天的listen_event_happend()設定成asyncio.sleep(1),也就是讓程式暫停一秒。

那當程式遇到await的時候就會註冊一個Event:Callback

這裡的Event指的是當await後面的函式asyncio.sleep(1)暫停完一秒之後
而這裡的Callback則會重新啟動await後續未執行的部分

  1. 建立任務列表(task list)
    1
    2
    tasks = [asyncio.ensure_future(example1()),
    asyncio.ensure_future(example2())]
    其中,asyncio.ensure_future會把協程(coroutine)對象轉換成任務(task)對象,才能被Event Loop執行。換句話說,任務(task)對象負責作為事件迴圈(Event Loop)和協程(coroutine)對象的溝通介面

所以協程(coroutine)是任務(task)的實際內容,任務(task)對象則有兩個功能,第一個是管理當前任務的狀態,可能是pending、finished、cancelled,第二個是當協程對象需要暫停時,任務對象會執行回調函數(callback),以確保非同步的順利執行。

  1. 執行事件迴圈(event loop)
    1
    2
    3
    4
    5
    #  執行參數裡的任務,等到任務完成就關閉Event Loop,不管loop裏面還有沒有其他的Task正在執行當中或是等待執行
    loop.run_until_complete(asyncio.wait(tasks))

    # 除非出現loop.stop()的程式碼,否則Event Loop就會永遠執行不會被關閉
    loop.run_forever()
    其中

asyncio.wait(tasks)會把tasks裏頭的所有任務包成一個大的任務。

loop.run_until_complete()可以傳入coroutine或Task或Future。會先包裝成task對象後才註冊到Event Loop執行

此外,還有其他Event Loop常用的API:

  1. loop.create_task(coro): 這個函數會接收一個coroutine object,並包裝成一個Task對象,同時把這個Task註冊到這個Event loop中等待執行。但是只會有建立而已,只有在呼叫loop.run_until_completeloop.run_forever()的時候才會真正被執行。coroutine object指的是coroutine function執行後回傳的物件。
  2. loop.is_running: 判斷Event Loop是否還在運行
  3. loop.is_closed(): 判斷Event Loop是否關閉
  4. loop.stop(): 停止Event Loop
  5. loop.close(): 關閉Event Loop

aiohttp模組

基於asyncio而開發的HTTP框架

跟一般爬蟲的差別僅在於用aiohttp模組裏頭的ClientSession來取代requests模組裏頭的session,此外,因為asyncio是基於async/await非同步的語法,所以用aiohttp建立後的session也要遵循非同步的寫法。

以下用爬cakeresume的網站為例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from aiohttp import ClientSession 

url = 'https://www.cakeresume.com/jobs?q=nodejs&ref=navbar_quick_link_jobs&page=2'

# 以aiohttp來建立session
async with ClientSession() as session:
async with session.get(url) as response:
text = await response.text()
doc = pq(text)
for each in doc('.job.well-list-item.well-list-item-link').items():
title = each('.job-title').text()
company = each('.page-name a').text()
for item in each('.job-list-item-tags').items():
salary = item('.info-section .job-salary-section').text()
print(f"{title}\n{company}\n{salary}\n"+"-"*60)

先前以非同步寫了一個簡單的小作品,[104/1111/cakeresume求職網爬蟲-程式碼],後續也有部署到Heroku並串接LINE聊天機器人。

Future v.s Task

Future對象

代表一個還未執行或還未完成的任務的結果,等同於javascript裏頭的promise。主要設計用途是為了異步程式,Event Loop會拿到很多待完成和未完成的任務,並一遍又一遍的進行輪詢,而這些任務都要以Future對象的結構加進Event_loop裏面

有以下四種method類別

  1. 觀察現在Future的狀態

    done() 察看這個任務是否已經完成(成功或失敗)
    cancelled() 察看這個任務是否已經被取消

  2. 指定任務的結果

    cancel() 取消任務的執行
    set_result() 判定任務成功,並指定執行完的結果
    set_exception() 判定任務失敗,並指定途中出現的exception

  3. 取得任務結果

    result() 取得任務成功時的結果,若任務未成功則為None
    exception() 取得任務失敗時的exception,若任務未失敗則為None

  4. 指定任務完成後續要進行的行為

    add_done_callback() 指定若任務完成後要執行的callback
    remove_done_callback() 取消若任務完成後要執行的callback

Task對象

Task對象是從Futrue對象繼承過來,所以Future有的方法Task都有。

Task對象有著Future對象的外殼,能被Event loop所使用,對內又能嵌入Coroutine,讓Coroutine成為這個未完成任務的實際內容。

參考