はじめに
PYNQが動き、Jupyter notebookの使い方がわかったところで、いよいよロジック資源へのアクセス方法へ。
第1回 Setup PYNQ-Z1
第2回 はじめてのJupyter
第3回 pythonとPYNQ PL資源へのアクセス
第4回 PYNQ OVERLAY
第5回 PYNQのOverlayを作ってみた
第6回 カスタムIPを作ってPYNQ overlayに組み込む
第7回 PYNQのHDMI出力overlayを作る
番外編 python ジャンプスタート
更新 2019/9/22 Numpyのデータをロジックと共有する方法
Python Environment
今回よむのはPython Environment。最初の例は素数を数えるpythonのコードの例。簡単にコードがかけるというデモ。
次の例が本題。
[重要] Numpyのデータをロジックと共有する方法
Xlnkモジュールを用いると、ARMプロセッサのメモリ空間にpythonのnumpy パッケージでアクセスできるバッファを作ることができます。そのバッファの物理アドレスのポインタを取り出し、GPIOを経由してロジックに知らせることで、ARMとロジックでデータを共有します。
import numpy as np
import pynq
def get_pynq_buffer(shape, dtype):
return pynq.Xlnk().cma_array(shape, dtype)
この関数でnumpyからもロジック領域からもアクセスできる連続領域を確保。
この関数を用いて4x4のuint32のメモリを確保して、その物理アドレスをGPIOを経由してロジックの渡すことで、ARMとロジックの間でデータを共有することができるようになります。
buffer = get_pynq_buffer(shape=(4,4), dtype=np.uint32)
pl_buffer_address = hex(buffer.physical_address)
MMIOを用いて、GPIO経由で物理アドレスを渡す。MMIOは第5回 PYNQのOverlayを作ってみたを参照。
from pynq import MMIO
mmio=MMIO(int(PL.ip_dict["axi_gpio_0"]['phys_addr']),4,True)
mmio.write(0, buffer.physical_address)
ロジックからは物理アドレスへの参照は、ロジックがマスターとなるAXIバス経由でDDRメモリにアクセスする。
非同期IOの統合
サンプルを読む
サンプルはこんな感じ
import asyncio
import random
import time
# Coroutine
async def wake_up(delay):
'''数秒sleepして起きる関数
'''
start_time = time.time()
print(f'The time is: {time.strftime("%I:%M:%S")}')
print(f"Suspending coroutine 'wake_up' at 'await` statement\n")
await asyncio.sleep(delay)
print(f"Resuming coroutine 'wake_up' from 'await` statement")
end_time = time.time()
sleep_time = end_time - start_time
print(f"'wake-up' was suspended for precisely: {sleep_time} seconds")
時刻を記録し、指定した時間だけスリープし、wakeしたらwake時刻を記録し、その差分から実際にsleepしていた時間を表示するのか。ふむふむ。えっ!コルーチンってなに?
coroutine
(参考1) @kawmraさん Kotlin の Coroutine を概観する
ふむふむ。非同期呼び出しするサブルーチンのようなものかな。スレッドは、初期化やら起動停止、イベントやセマフォやらなにかと手間がかかる。そのあたりが簡単に。
(参考2) @icoxfog417さん Pythonにおける非同期処理: asyncio逆引きリファレンス
- async付きで定義した関数を呼び出すと、corutineが戻り、実行はされない。theadの宣言みたいなものか。
- corutineの関数の中ではawait でいろいろイベント待ちをする感じ。pthread_wait_condとか、mutexの類かな。
- asyncio.get_event_loop()は、タスクスイッチャみたいなものか。
pthreadと対応づけた解説があればいいのに。このへんにして、またあとで読もう。
delay = random.randint(1,5)
my_event_loop = asyncio.get_event_loop()
try:
print("Creating task for coroutine 'wake_up'\n")
wake_up_task = my_event_loop.create_task(wake_up(delay))
my_event_loop.run_until_complete(wake_up_task)
except RuntimeError as err:
print (f'{err}' +
' - restart the Jupyter kernel to re-run the event loop')
finally:
my_event_loop.close()
あれ?割り込みの話はどこ?
まとめ
このページ、PYNQのPLとの関係は get_pynq_buffer()
だけだった。次にいこう!