本日はプロセス間、スレッド間などで値をやり取りする方法を紹介します。
使用する手法は、C言語などを触ったことある人は聞いたことある人もいるかもしれませんが
「共有メモリ (Shared Memory)」という仕組みです。
Pythonで共有メモリを実装するのに便利な「mmap」というライブラリがあるので、
本日は紹介したいと思います。
####共有メモリとは
ざっくりですが共有メモリとは、名前の通りPCのメモリ上の一部を複数のプロセス間で使用できるようにする仕組みです。
最初はこのくらいの理解で十分だと思います。
複数のPythonアプリ間でデータの通信ができるので便利です。
応答速度もかなり高速です。
###ソースコード
本記事では共有メモリはファイルに反映される手法を利用します。
これによりアプリ終了時に、最後の状態を維持できるので便利です。
(製造現場などのアプリケーションにする場合は必須です。)
''' ---------------------------
共有メモリ管理クラス
---------------------------'''
import mmap
import os
WORD_SIZE = 2 '''1WORD = 2byte'''
MAP_SIZE = 1024 * WORD_SIZE
MAP_FILE_NAME = "./map.dat"
class classMMap:
def __init__(self) -> None:
"""コンストラクタ
"""
self._mm = None
''' map file check '''
if os.path.exists(MAP_FILE_NAME):
''' Read memory map '''
self._readMMapFile()
else:
''' create and initialize '''
self._createMMapFile()
''' Read memory map '''
self._readMMapFile()
print(" __init__ comp.")
def _createMMapFile(self):
with open(MAP_FILE_NAME, mode="wb") as file:
initStr = '00' * MAP_SIZE
initByte = bytes.fromhex(initStr)
file.write(initByte)
print("_createMMapFile fin.")
def _readMMapFile(self):
with open(MAP_FILE_NAME, mode="r+b") as file:
self._mm = mmap.mmap(file.fileno(), 0)
self._mm.seek(0)
print("_readMMapFile fin.")
def ReadShort(self, adr:int):
"""指定したアドレスの値を読み込む
(読み込みデータ最大値 : 1WORD = 2bytes = 65535)
Args:
adr : 読み込みたいアドレス
"""
try:
self._mm.seek(adr*WORD_SIZE)
bytes = self._mm.read(WORD_SIZE)
val = int.from_bytes(bytes, 'little',signed=True)
self._mm.seek(0)
return val
except Exception as e:
print("ReadShort except" + str(e).replace('\n',''))
return
def WriteShort(self, adr:int, data) -> None:
"""指定したアドレスに値を書き込む
(書き込みデータ最大値 : 1WORD = 2bytes = 65535)
Args:
adr (int): 書き込みたいアドレス
data (short): 書き込むデータ
"""
if data >= (2**(8*WORD_SIZE)):
print("Error. Over 2bytes (65535)")
return
try:
ddata = int(data)
bytes = ddata.to_bytes(WORD_SIZE, 'little',signed=True)
for i in range(WORD_SIZE):
self._mm[adr*WORD_SIZE + i] = bytes[i]
except Exception as e:
print("WriteShort except" + str(e).replace('\n',''))
def despose(self):
self._mm.close()
###共有メモリ使用例
これから実際に使用するサンプルも紹介します。(今回はスレッド間でのデータやり取りで例を示そうと思います。)
import time
import threading
from ClsMmap import classMMap ''' 上で作成した共有メモリ操作クラス '''
TEST_ADR = 100
def func1():
"""100番アドレスを1秒ごとに読み出す
"""
while True:
print(clsMMap.ReadShort(TEST_ADR))
time.sleep(1)
def func2():
"""100番アドレスを1秒ごとに書き換える(1 ~ 10000の範囲)
"""
while True:
curVal = clsMMap.ReadShort(TEST_ADR)
clsMMap.WriteShort(TEST_ADR, (curVal + 1) % 10000) ''' 値のインクリメント '''
time.sleep(1)
if __name__ == '__main__':
''' 共有メモリ管理クラス インスタンス生成 '''
clsMMap = classMMap()
''' 読み込み用スレッド '''
thread1 = threading.Thread(target=func1)
thread1.start()
''' 書き込み用スレッド '''
thread2 = threading.Thread(target=func2)
thread2.start()
※実務で利用する場合は排他処理や例外処理をもう少し実装する必要があります。
mmapのClose()処理も適宜呼び出してください。
Close()をしない場合は、プログラム終了時、最後の状態を自動的に
共有メモリファイルに書き込んでくれます。
以上が今回の内容となります。是非参考にしてください!