[教學] 用 MicroPython的非同步排程 asyncio 來模擬多工

對於使用 Arduino IDE來開發 NodeMCU 專案的朋友來說,要讓兩個以上的任務,在某個時間內,按照一定的時間間隔,個別運作且同時被執行,程式不太容易實現。舉例來說,像下圖這樣三個任務,綠色任務每一秒讀取溫度感測器把結果輸出到UART;黃色任務,每隔兩秒閃一下;藍色任務檢查Pin2 的按鈕是否被按下,然後結束所有工作。而且這三個任務,個別按照一定的時間間隔,重複執行、而且同時運作。在Arduino IDE裡面,要怎麼寫?


光想著要在 Loop () 裡面三個任務的順序,就沒有這麼容易,更不用說三個的時間差跟每一輪 Loop 的關係了。

找出三個任務時間的最小公因數時間,用個 Timer Interrupt 來固定週期執行,然後把時間比較長的任務,以次數計算跳過幾次週期再執行一次,這樣或許是個辦法。但像黃色任務這樣,中間還需要用 delay()暫停 500ms 的任務,還需要拆成兩個任務,會讓程式設計更加複雜。

對於大部分的 MCU來說,包含 NodeMCU,因為構造簡單,大多都沒有多工的能力,沒辦法像在電腦上面寫程式,開 Task 或 Process,就可以多工執行。對於上圖這麼簡單的功能,寫起來也挺費事的。

MicroPython 提供 uasyncio 模組

Asyncio 是 Python 在 PEP 492 (Python 3.5) 開始提供的功能,可以一定程度提供「模擬多工」的程式設計,它的概念是在每個任務中,執行 asyncio.sleep(second) ,並把這個時間,讓出來給其他的同時在運作的任務執行,而不會像 Arduino IDE 的 delay() 會佔用 CPU 的執行。可以讓程式開發,稍微容易一點。

以下是一個可以在 Python 3.0 裡面執行的程式,示範使用 CPython async 模組的方法,你可以自行在有安裝Python的電腦實驗一下: 

import asyncio

async def task1():

    # 這個任務什麼都不做,一進來就讓出3秒時間給其他任務
    await asyncio.sleep(3)#停三秒
    print('Task1 Done!')
async def task2():
    # 這個任務跑20個迴圈,每個迴圈讓出50ms給其他任務,總一秒鐘後完成
    for i in range(20):#
        await asyncio.sleep(0.05)#每個迴圈暫停 50ms,並把這個時間交給別的 Task
    print('Task2 Done!')
async def main():
    print(f'{time.ctime()} Hello! Async Loop Started!')
    # Wait for at most 1 second
    alltasks = asyncio.gather(task1(), task2())
    try:
        #await asyncio.wait_for(alltasks, timeout=2.0)#會在完成前 Time Out
        await asyncio.wait_for(alltasks, timeout=5.0)#完成後才 Time Out
    except asyncio.TimeoutError:
        print(f'{time.ctime()} Task Time Out!')
        try:
            await alltasks
        except asyncio.CancelledError:
            print("Main Async Loop Was Cancelled!")
    print(f'{time.ctime()} Goodbye!')
    
asyncio.run(main())

MicroPython 自從 v1.13 版開始,提供更相容於 CPython 的 uasyncio 模組,幾乎上面這個 CPython 程式裡面的功能,都能夠在 MicroPython裡面實現。並直接提供 uasyncio.sleep_ms() 這個以毫秒(millisecond) 出讓時間的方法,讓MCU多工任務排程的程式寫起來更加簡單方便。且不論是否要寫出像前面那麼複雜的三個任務的程式,就算退一步來說,寫個在背景讓蜂鳴器發聲,或者讓 LED 閃燈的非同步任務,讓警告音任務,不會造成主程式的嚴重延誤,那也非常值得了。

本文先不會觸碰到上面範例中,兩個任務同時執行的狀況,而先用一個任務在背景執行的狀況來示範,會讓你比較快上手。

(註:目前多執行緒 multi-thread 的模組 _thread,還在實驗階段,期待早日出來正式版,就能更進一步使用 ESP32 雙核心的優勢了!)

用 LED、有源蜂鳴器、無源蜂鳴器或喇叭,讓ESP32發出訊號

我在本文最下面的這個 MicroPython 範例,示範一個能夠讓你在不干擾主程式執行的狀況下,在程式執行過程中,可以讓 LED 發出特定訊號(等同於有源蜂鳴器發出聲音),或者讓無源蜂鳴器或喇叭,發出具有不同聲調的警示音出來。

舉例來說,你要讓 LED 閃光三下,用來表示某個錯誤訊號,你或許會使用下面的時序,來點亮、關閉LED:

亮150ms , 暗 80ms, 亮150ms , 暗 80ms, 亮150ms , 暗 80ms

若是用 time.sleep_ms() 這個函數來切換 LED 的明暗的話,你的主程式,將會被佔用  690ms,超過半秒的時間了。如果,此時主程式,正在執行一個變速的 PWM 功能,那你會明顯感受到 PWM 變化的卡頓。使用 uasyncio 非同步排程任務的寫法,就能讓 LED 在按照你設定的時序閃爍的時候,感覺不到主程式被延遲的狀況。

範例程式

這個範例,在main 的段落,示範兩段不同的範例,前半段示範只有 0, 1 區別的LED輸出,後半段則是以 PWM 輸出不同頻率的信號,可以讓無源蜂鳴器或喇叭可以發出不同音調。當然,也能用在LED,只是亮光看起來會變暗。
如果你要直接用 NodeMCU上面的 LED 來試驗這個範例程式,你可以把 numpin 先改成 13,並記得該 Pin 是沒辦法輸出 PWM 的,你只能用前半段的範例。
indicator.py

import uasyncio
from machine import Pin, PWM

class Indicator(object):
    def __init__(self, sigwarn=None, sigerr=None, sigok=None):
        self._inuse = False
        self.sig = dict()
        self._sigdur = dict()
        self._funcout = self._toneout#either _toneout or digiout
        self.pinsig = None
        
        #format sig['typesig']=[[frequency(Hz), duration(msec)], ...] each cell represent a duration of sound or led light on/off
        if sigerr:
            self.sig['err'] = sigwarn
        else:
            self.sig['err'] = [[600,150],[0,80],[600,150],[0,80],[600,150],[0,500]]#longer 500 delay to separae next signal
        if sigwarn:
            self.sig['warn'] = sigwarn
        else:
            self.sig['warn'] = [[600,160],[0,80],[600,160],[0,500]]
        if sigok:
            self.sig['ok'] = sigwarn
        else:
            self.sig['ok'] = [[500,600],[0,500]]
        self._sigdur['warn'] = sum([self.sig['warn'][i][1] for i in range(len(self.sig['warn']))])
        self._sigdur['err'] = sum([self.sig['err'][i][1] for i in range(len(self.sig['err']))])
        self._sigdur['ok'] = sum([self.sig['ok'][i][1] for i in range(len(self.sig['ok']))])
        
    @property
    def inuse(self):
        return self._inuse
    
    async def _toneout(self, typesig:str):
        for freq, msec in self.sig[typesig]:
            if freq > 0:
                self.pinsig.freq(int(freq))
                self.pinsig.duty(int(freq))
            else:
                self.pinsig.duty(0)
            await uasyncio.sleep_ms(int(msec))
        
    async def _digiout(self, typesig:str):
        for dio, msec in self.sig[typesig]:
            self.pinsig.value(int(bool(dio)))#allow using freq, freq=0, off; >0, on
            await uasyncio.sleep_ms(int(msec))
        
    async def task_out(self, typesig:str):
        self._inuse = True
        uasyncio.create_task(self._funcout(typesig))
        await uasyncio.sleep_ms(self._sigdur[typesig]+2)
        self._inuse = False
        
class SPK(Indicator):
    '''
    Class for output PWM sig, like a buzzer without build-in OSC, or a speaker
    '''
    def __init__(self, numpin:int, sigwarn=None, sigerr=None, sigok=None):
        super().__init__(sigwarn, sigerr, sigok)
        self.pinsig = PWM(Pin(numpin))
        self._funcout = self._toneout
        
    def spk(self, typesig:str):
        if not self.inuse:
            uasyncio.run(self.task_out(typesig))
    
        
class LED(Indicator):
    '''
    Class for output PWM sig, like a buzzer with build-in OSC, or a LED
    '''
    def __init__(self, numpin:int, sigwarn=None, sigerr=None, sigok=None):
        super().__init__(sigwarn, sigerr, sigok)
        self.pinsig = Pin(numpin, Pin.OUT)
        self._funcout = self._digiout
        
    def led(self, typesig:str):
        if not self.inuse:
            uasyncio.run(self.task_out(typesig))

if __name__ == "__main__":
    '''
    global ledmsg
    ledmsg = LED(numpin=23)
    ledmsg.led("warn")
    while ledmsg.inuse:
        pass
    ledmsg.led("err")
    while ledmsg.inuse:
        pass
    ledmsg.led("ok")
    while ledmsg.inuse:
        pass
    '''
    global spkmsg
    spkmsg = SPK(numpin=23)
    spkmsg.spk("warn")
    while spkmsg.inuse:
        pass
    spkmsg.spk("err")
    while spkmsg.inuse:
        pass
    spkmsg.spk("ok")
    while spkmsg.inuse:
        pass


沒有留言:

張貼留言