Help us understand the problem. What is going on with this article?

第3回 pythonとPYNQ PL資源へのアクセス

はじめに

PYNQが動き、Jupyter notebookの使い方がわかったところで、いよいよロジック資源へのアクセス方法へ。

第1回 Setup PYNQ-Z1
第2回 はじめてのJupyter
第3回 pythonとPYNQ PL資源へのアクセス
第4回 PYNQ OVERLAY
第5回 PYNQのOverlayを作ってみた
第6回 カスタムIPを作ってPYNQ overlayに組み込む
第7回 PYNQのHDMI出力overlayを作る
番外編 python ジャンプスタート

更新 2019/9/22 Numpyのデータをロジックと共有する方法

Python Environment

今回よむのはPython Environment。最初の例は素数を数えるpythonのコードの例。簡単にコードがかけるというデモ。
次の例が本題。

[重要] Numpyのデータをロジックと共有する方法

Xlnkモジュールを用いると、ARMプロセッサのメモリ空間にpythonのnumpy パッケージでアクセスできるバッファを作ることができます。そのバッファの物理アドレスのポインタを取り出し、GPIOを経由してロジックに知らせることで、ARMとロジックでデータを共有します。

バッファ取得関数の定義
import numpy as np
import pynq

def get_pynq_buffer(shape, dtype):
    return pynq.Xlnk().cma_array(shape, dtype)

この関数でnumpyからもロジック領域からもアクセスできる連続領域を確保。

この関数を用いて4x4のuint32のメモリを確保して、その物理アドレスをGPIOを経由してロジックの渡すことで、ARMとロジックの間でデータを共有することができるようになります。

uint32の4x4配列バッファを確保
buffer = get_pynq_buffer(shape=(4,4), dtype=np.uint32)
物理アドレスを16進数で確認するには
pl_buffer_address = hex(buffer.physical_address)

MMIOを用いて、GPIO経由で物理アドレスを渡す。MMIOは第5回 PYNQのOverlayを作ってみたを参照。

GPIO_0でbufferの物理アドレスを送る例
from pynq import MMIO
mmio=MMIO(int(PL.ip_dict["axi_gpio_0"]['phys_addr']),4,True)
mmio.write(0, buffer.physical_address)

ロジックからは物理アドレスへの参照は、ロジックがマスターとなるAXIバス経由でDDRメモリにアクセスする。

非同期IOの統合

サンプルを読む

サンプルはこんな感じ

import asyncio
import random
import time

# Coroutine
async def wake_up(delay):
    '''数秒sleepして起きる関数
    '''
    start_time = time.time()
    print(f'The time is: {time.strftime("%I:%M:%S")}')

    print(f"Suspending coroutine 'wake_up' at 'await` statement\n")
    await asyncio.sleep(delay)

    print(f"Resuming coroutine 'wake_up' from 'await` statement")
    end_time = time.time()
    sleep_time = end_time - start_time
    print(f"'wake-up' was suspended for precisely: {sleep_time} seconds")

時刻を記録し、指定した時間だけスリープし、wakeしたらwake時刻を記録し、その差分から実際にsleepしていた時間を表示するのか。ふむふむ。えっ!コルーチンってなに?

coroutine

(参考1) @kawmraさん Kotlin の Coroutine を概観する
ふむふむ。非同期呼び出しするサブルーチンのようなものかな。スレッドは、初期化やら起動停止、イベントやセマフォやらなにかと手間がかかる。そのあたりが簡単に。
(参考2) @icoxfog417さん Pythonにおける非同期処理: asyncio逆引きリファレンス

  • async付きで定義した関数を呼び出すと、corutineが戻り、実行はされない。theadの宣言みたいなものか。
  • corutineの関数の中ではawait でいろいろイベント待ちをする感じ。pthread_wait_condとか、mutexの類かな。
  • asyncio.get_event_loop()は、タスクスイッチャみたいなものか。

pthreadと対応づけた解説があればいいのに。このへんにして、またあとで読もう。

delay = random.randint(1,5)
my_event_loop = asyncio.get_event_loop()

try:
    print("Creating task for coroutine 'wake_up'\n")
    wake_up_task = my_event_loop.create_task(wake_up(delay))
    my_event_loop.run_until_complete(wake_up_task)
except RuntimeError as err:
    print (f'{err}' +
        ' - restart the Jupyter kernel to re-run the event loop')
finally:
    my_event_loop.close()

あれ?割り込みの話はどこ?

まとめ

このページ、PYNQのPLとの関係は get_pynq_buffer()だけだった。次にいこう!

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした