はじめに
今回私は最近はやりのchatGPTに興味を持ち、深層学習について学んでみたいと思い立ちました!
深層学習といえばPythonということなので、最終的にはPythonを使って深層学習ができるとこまでコツコツと学習していくことにしました。
ただ、勉強するだけではなく少しでもアウトプットをしようということで、備忘録として学習した内容をまとめていこうと思います。
この記事が少しでも誰かの糧になることを願っております!
※投稿主の環境はWindowsなのでMacの方は多少違う部分が出てくると思いますが、ご了承ください。
最初の記事:Python初心者の備忘録 #01
前の記事:Python初心者の備忘録 #09 ~DSに使われるライブラリ編04~
次の記事:Python初心者の備忘録 #11 ~統計学入門編01~
今回はOpenCV、glob、 os、 pathlib、 tqdm、 multiprocessingについてまとめております。
また、本記事で使用している画像は下記リポジトリで公開しているので、必要であればこちらからダウンロードしてください。
https://github.com/Yushin-Tati/Learnig-OpenCV-picture/tree/main
2024/01/29以前にDocker環境を構築した方へ
現状ではimport cv2
でエラーになると思います。
一度Dockerコンテナを停止し、ds-python
のimageを削除したうえで、#06に記載されている最新のDockerfileで環境を再構築してください。
■学習に使用している資料
Udemy:米国データサイエンティストがやさしく教えるデータサイエンスのためのPython講座
■OpneCV
▶OpenCVとは
Pythonで画像を扱うためのライブラリ。
OpenCVはC++で書かれているが、Python用のラッパーライブラリがある。
Python Imaging Library(PIL)というライブラリもあるが、OpenCVのほうがNumPyとの相性がいい。
使用する際はimport cv2
でインポートする。
# OpenCVのimport
import cv2
▶cv2.imread('path')
path
で指定した画像をndarrayに変換して読み込むことができる。
この時のそれぞれの要素の値は[blue(B)、green(G), red(R)]
を表しており、それぞれの数値は輝度値(0~255)を表している。
読み込んだ画像はplt.imshow()
で表示することができる。
しかしこの時、BGRではなくRGBとして表示しようとするので、順序を変える必要がある。
cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
でBGRからRGBに変換できる。
# 画像ファイルをndarrayで読み込み (BGRで読み込み)
im = cv2.imread('lenna.png')
# ndarrayを画像として表示 (RGBで表示しようとするので、このままでは色が変になる)
plt.imshow(im)
# BGR->RGB
rgb = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
plt.imshow(rgb)
# Slicingで画像の切り取りも可能
# crop
im = im[50:-50, 50:-50, :]
plt.imshow(im)
▶cv2.imwrite('path', image)
path
にimage
を保存する。
この時はBGRとして保存されるので、cv2.cvtColor()
でRGBに変換していると、保存後のファイルの色がおかしくなるので、cv2.imwrite()
で画像を保存する際はBGRで保存するようにする。
# 画像の保存
# 保存する際はBGRで保存する
cv2.imwrite('sample_cv2.png', im)
▶Binarization(2値化)
ある閾値を指定して、その値より大きいか小さいかで値を2極化する手法。
画像であれば、「白か黒」のみで出力するなどの形となる。
2値化の手法にはいくつかある。
- 閾値を指定して2値化する。
- 「大津の2値化(Otsu's binarization)」を利用する。
⇒自動でいい感じの閾値を指定してくれる。 - Adaptive Thresholder
⇒ある小さな区画ごとに区切って、その区画ごとに閾値を指定する。
など...
・cv2.threshold()
cv2.threshold(image, 閾値, 閾値以上の時に設定する値, cv2.THRESH_BINARY)
で2値化を行うことができる。
返り値は(閾値, ndarray)
のタプルで返却される。
# 今回は白黒画像に対して2値化を行う
im = cv2.imread('lenna.png')
gray = cv2.cvtColor(rgb, cv2.COLOR_BGR2GRAY)
# 輝度値が127未満は0, それ以上は255
ret, thresh = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)
plt.imshow(thresh, cmap='gray')
・Otsu's binarization
双峰性(bimodal)のあるヒストグラムに対して使用できる。
線形判別分析法(LDA:Linear Discriminant Analysis)を画像に適用したアルゴリズム
cv2.threshold(image,0,255,cv2.THRESH_BINARY+cv2.THRESH_OTSU)
で処理をしてくれる。
※双峰性ヒストグラム
# 輝度値のヒストグラムがbimodalの時に有効.自動で最適な閾値を設定
# gray scale
im = cv2.imread('bimodal_sample.png')
gray = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
plt.imshow(gray, cmap='gray')
# Otsu binarization
ret, thresh = cv2.threshold(gray,0,255,cv2.THRESH_BINARY+cv2.THRESH_OTSU)
plt.imshow(thresh, cmap='gray')
自動設定された閾値をヒストグラムで確認してみる
# ヒストグラムと閾値
sns.distplot(gray.flatten())
# ヒストグラムに線を引くことができる
plt.vlines(ret, 0, 0.01, 'r') # (値, y軸の最小, y軸の最大, '色')
・cv2.adaptiveThreshold()
光の条件によって不安定な明暗が存在し、うまく2値化できない画像に使用する。
文字の識別を行う場合によく使用される。
基本的にある区画(window)内の輝度値の平均から定数を引いた値を閾値とする。
※平均では文字などがない数値の変化が乏しい区画ではノイズが混じってしまうから
cv2.adaptiveThreshold(image, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, window_size, 定数)
で処理が可能。
返り値はndarray
のみ
# gray scale
im = cv2.imread('text_pic.jpg')
gray = cv2.cvtColor(im, cv2.COLOR_BGR2GRAY)
plt.imshow(gray, cmap='gray')
# Adaptive Threshold
# kernel(window)サイズは15x15,
thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY,15,4)
plt.imshow(thresh, cmap='gray')
■ファイルパスの取得(glob)
以降の資料では下記サイトからダウンロードできるデータセットを使用するので、必要な方は事前にダウンロードして、/work/public-covid-data/
配下に格納しておく。
https://medicalsegmentation.com/covid19/
※赤く囲まれた2つのファイルをダウンロード
▶globとは
Pythonに元から入っているライブラリで、import glob
またはfrom glob import glob
でインポートし、使用可能。
前者であればglob.glob('ワイルドカード')
で、後者であればglob('ワイルドカード')
で一致するファイルパスを取得することができる。
※'ワイルドカード'
は*
や?
などがあるが、詳しくは検索していただきたい。
# globのインポート
# import glob
from glob import glob
#引数のパターンにマッチするファイルパスのリストを取得
# * : ワイルドカード(0文字以上の任意の文字列)
glob('*')
# .pngのみ
glob('*.png')
# 5~9で始まるファイルのみ
glob('[5-9]*')
■ファイルパスを安全に操作する(os & pathlib)
上記のglob
は個人でpathを扱う程度であれば問題ないのだが、何か製品の開発を行う際は文字列で扱うのは不都合が生じる場合がある。(WindowsとMacではファイルパスが少し異なるなど)
そのため、特に理由がなければ次のモジュールを使用するのが望ましい。
▶pathlibモジュール
from plathlib import Path
でインポートする。
Path('path')
で、指定したpathのPathオブジェクトを作成できる。
Pathオブジェクトに.iterdir()
を使用することでイテレーターを作成することができ、next(イテレーター)
とすることで、一つ下のディレクトリのPathオブジェクトを順番に取得することができる。
list(イテレーター)
とすることで、現在のディレクトリのPathオブジェクトをリストとして取得可能。
list(イテレーター.glob('ワイルドカード'))
とすることで、特定のPathオブジェクトの取得も可能。
# pathlib.Pathのインポート
from pathlib import Path
#pathのインスタンス作成
p = Path('public-covid-data')
# next(イテレーター)で一つ下の階層のPathオブジェクトを取得
next(p.iterdir()) # ⇒ PosixPath('public-covid-data/rp_im')
next(p.iterdir()) # ⇒ PosixPath('public-covid-data/rp_msk')
# Pathオブジェクトをlistで取得
list(p.iterdir()) # ⇒ [PosixPath('public-covid-data/rp_im'), PosixPath('public-covid-data/rp_msk')]
# rp_im以下のpathオブジェクト
sub_p = list(p.iterdir())[1]
list(sub_p.iterdir()) # ⇒ [PosixPath('public-covid-data/rp_im/5.nii.gz'), PosixPath('public-covid-data/rp_im/9.nii.gz'), ..., PosixPath('public-covid-data/rp_im/2.nii.gz')]
# p.glob()でパターンでマッチングできる
path = list(sub_p.glob('*[6-9]*')) # ⇒ [PosixPath('public-covid-data/rp_im/9.nii.gz'), PosixPath('public-covid-data/rp_im/7.nii.gz'), PosixPath('public-covid-data/rp_im/8.nii.gz'), PosixPath('public-covid-data/rp_im/6.nii.gz')]
▶osモジュール
import os
でインポートする。
pathlibで取得したファイルパス情報を分割したり、結合したりすることができる。
os.path.split(Pathオブジェクト)
とすることで、head(フォルダパス)とtail(ファイル名)をタプル型で返却する。
os.path.join(head, tail)
でファイルパスを作成可能
target_file = path[0]
# head(フォルダパス)とtail(ファイル名)にsplit
folder_p, file_name = os.path.split(target_file)
print(folder_p, file_name) # ⇒ public-covid-data/rp_im 9.nii.gz
# フォルダパスとファイル名を連結させる
os.path.join(folder_p, 'test.txt') # ⇒ 'public-covid-data/rp_im/test.txt'
▶フォルダの作成
何か処理の途中で中間データを保存したいということは多々あるので、その時に保存するファルダをあらかじめ作成することはよくある。
os.makedirs('path')
でpathのディレクトリを作成することができる。
もし、すでに指定したディレクトリが存在する場合はエラーになってしまうので、先にos.path.exists('path')
で存在確認を行う必要がある。
# パスの文字列を構成
p = Path('public-covid-data')
new_folder_name = 'new_folder'
new_folder_path = os.path.join(p, new_folder_name)
# すでに存在するか確認し、なければ作成
if not os.path.exists(new_folder_path):
# 指定したフォルダを作成する
os.makedirs(new_folder_path)
Challenge
/public-covid-data/
配下のフォルダやファイルについて以下の条件で、DataFrameを作成
- column: path_rm、 filename、 path_msk
path_im | filename | path_msk | |
---|---|---|---|
0 | public-covid-data/rp_im/5.nii.gz | 5.nii.gz | public-covid-data/rp_msk/5.nii.gz |
⋮ | ⋮ | ⋮ | ⋮ |
という感じ
解答例
# pathの取得
p = Path('public-covid-data')
df_list = []
# リストの作成
for folder in p.iterdir():
# リスト内包表記
# .as_posix() -> Pathオブジェクトを文字列で取得
path_list = [p.as_posix() for p in list(folder.iterdir())]
file_list = [os.path.split(p)[1] for p in list(folder.iterdir())]
# DataFrameを作成し、リストに格納
df_list.append(pd.DataFrame({'path': path_list, 'filename': file_list}))
# dfをマージ
df = df_list[0].merge(df_list[1], on='filename', suffixes=('_im', '_msk'))
path_im | filename | path_msk | |
---|---|---|---|
0 | public-covid-data/rp_im/5.nii.gz | 5.nii.gz | public-covid-data/rp_msk/5.nii.gz |
1 | public-covid-data/rp_im/9.nii.gz | 9.nii.gz | public-covid-data/rp_msk/9.nii.gz |
2 | public-covid-data/rp_im/7.nii.gz | 7.nii.gz | public-covid-data/rp_msk/7.nii.gz |
3 | public-covid-data/rp_im/3.nii.gz | 3.nii.gz | public-covid-data/rp_msk/3.nii.gz |
4 | public-covid-data/rp_im/1.nii.gz | 1.nii.gz | public-covid-data/rp_msk/1.nii.gz |
5 | public-covid-data/rp_im/4.nii.gz | 4.nii.gz | public-covid-data/rp_msk/4.nii.gz |
6 | public-covid-data/rp_im/8.nii.gz | 8.nii.gz | public-covid-data/rp_msk/8.nii.gz |
7 | public-covid-data/rp_im/6.nii.gz | 6.nii.gz | public-covid-data/rp_msk/6.nii.gz |
8 | public-covid-data/rp_im/2.nii.gz | 2.nii.gz | public-covid-data/rp_msk/2.nii.gz |
■プログレスバーの表示
▶tqdm
for文などのループ処理を行う際に今どこまで処理が完了しているかを表示してくれるライブラリ。
from tqdm import tqdm
でインポートする。
for i in tqdm(list)
とすることで、ループ処理のプログレスバーを表示可能。
このままだとプログレスバーを何行も出力するので、1行のみで表示したい場合は引数にposition=0
を指定する。
DataFrameなどのプログレスバーを表示したい場合は、引数にtotal=len(df)
を指定する。
# import
from tqdm import tqdm
# tqdmの戻り値をそのままfor文で回す
sum = 0
for i in tqdm(range(1e7), position=0):
sum += i
# DataFrameのプログレスバーの表示
# 前述のChallengeで作成したdfを使用している
for idx, row in tqdm(df.iterrows(), total=len(df)):
print('image path for {} is here: {}'.format(row['path_im'], row['filename']))
-> 100%|██████████| 9/9 [00:00<00:00, 1624.58it/s]
image path for public-covid-data/rp_im/5.nii.gz is here: 5.nii.gz
image path for public-covid-data/rp_im/9.nii.gz is here: 9.nii.gz
image path for public-covid-data/rp_im/7.nii.gz is here: 7.nii.gz
image path for public-covid-data/rp_im/3.nii.gz is here: 3.nii.gz
image path for public-covid-data/rp_im/1.nii.gz is here: 1.nii.gz
image path for public-covid-data/rp_im/4.nii.gz is here: 4.nii.gz
image path for public-covid-data/rp_im/8.nii.gz is here: 8.nii.gz
image path for public-covid-data/rp_im/6.nii.gz is here: 6.nii.gz
image path for public-covid-data/rp_im/2.nii.gz is here: 2.nii.gz
■CTデータを扱う
▶nibanel
NIfTIをPythonで扱う際に使用するライブラリ。
NIfTIは主に脳のMRI画像やCT画像を扱う際に使用されるデータ形式。
使用する際はimport nibabel as nib
とインポートする。
nib.load('path')
でpathのNIfTIファイルをロードすることができる。
ロードしたファイルは.get_fdata()
で3dのndarrayとして取得することができる。
# インポート
import nibabel as nib
# NIfTIファイルのロード
im_nifti = nib.load(df['path_im'].iloc[0])
# データをndarryとして取得
data = im_nifti.get_fdata()
# 今回のデータは反時計回りに90°倒れてしまっているので、.transpose()でreshapeする
data = np.transpose(data)
plt.imshow(data[20, :, :], cmap='gray')
Challenge
前回のChallengeで作成したdfにそれぞれのNIfTIファイルのSlice数を格納したカラムを追加する。
※Slice数は画像ファイルのz(奥行)に当たるデータで、今回で言えばtranspose後の1dのデータに当たる。
[条件]
- カラム名:'slice_num'
- CT画像の取得からtransposeまでを行う関数の作成
- tqdmを使用して、完了までの経過を表示できるようにする
解答例
# loadしてnumpy arrayを取り出す関数
def load_nifti(nifti_path):
im_nifti = nib.load(nifti_path)
data = im_nifti.get_fdata()
return np.transpose(data)
# slice_numカラムの作成
for idx, row in tqdm(df.iterrows(), total=len(df)):
data = load_nifti(row['path_im'])
# data shape is (z, h, w)
slice_num = data.shape[0]
df.loc[idx, 'slice_num'] = slice_num
■並列処理
▶map()
map(func, iterable)
でイテラブル(listなど)な変数に対して、要素を一つずつ取り出して関数funcを適用するイテレーターを返すPythonのBuilt in Function。
返り値はイテレーターなので、next()
で要素を取り出したり、list()
でリストを取得することができる。
普段はリスト内包表記でもかけるのでmap()
は使用しないが、並列処理の場合はよく活用される。
def square(n):
return n ** 2
# map()
list = [1, 2, 3, 4]
list(map(square, list)) # ⇒ [1, 4, 9, 16]
# 返り値はイテレーター
iter = map(square, list)
next(iter) # ⇒ 1
next(iter) # ⇒ 4
▶multiprocessing
使用する際はfrom multiprocessing import Pool, cpu_count
でインポートする。
cpu_count()
は現在使用できる物理cpuの数を返す。
p = Pool(processing=num)
で使用する物理cpuの数を指定できる。
このp
に対して、様々な関数を適用することで、指定したnum
の数分、並列処理を行うことができる。
並列処理が終了した際はp.close()
とp.join()
で並列処理を終了する必要がある。
・p.map()
p.map(func, iter)
でiter
に含まれる要素に対して、順番にfunc
を適用するのではなく、可能な限り、同時に処理を行うことができる。
全ての処理が終了した時点で、その処理の結果をlistで返す。
・p.imap()
p.imap(func, iter)
で並列処理を行い、すべての処理が完了するのを待たずに、処理が終わった要素から結果を返却するイテレーターを返す。
イテレーターを返すので、for文でその後の処理を回すこともできる。
しかし、iterの要素の順番は順守されるので、要素3の処理が終了したとしても、要素2の処理が終了するまで、イテレーターが値を返すことができない。
・p.imap_unordered()
.imap()
と仕様はほぼ同じだが、iterの要素の順番の制約はなくなり、処理が終了した時点で結果を返すイテレーターとなる。
順番に縛られないので、.imap()
よりそれぞれの要素の処理速度は速くなる。
from multiprocessing import Pool, cpu_count
import time
# pインスタンスを作成 (使用するCPUの物理コア数を指定)
p = Pool(processes=cpu_count()-1)
# 引数の値分処理を停止する関数
def wait_sec(sec):
time.sleep(sec)
return sec ** 2
# 並列処理なし
before = time.time()
results = map(wait_sec, [1, 5, 3])
after = time.time()
print('it took {} sec'.format(after-before))
# ⇒ it took 9.009947299957275 sec (1 + 5 + 3で9秒かかる)
# .map()で並列処理
before = time.time()
results = map(wait_sec, [1, 5, 3])
after = time.time()
print('it took {} sec'.format(after-before))
# ⇒ it took 5.009947299957275 sec (それぞれ同時に処理するので、最大の5秒となる)
# .imap()はイテレーターを返す
start = time.time()
for i in p.imap(wait_sec, [1, 5, 3]):
print('{}: {} sec'.format(i, time.time() - start))
# ⇒ 1: 1.0047399997711182 sec
# 25: 5.013734579086304 sec
# 9: 5.0174171924591064 sec
# .imap_unordered()は処理が終わり次第返す
for i in p.imap_unordered(wait_sec, [1, 5, 3]):
print('{}: {} sec'.format(i, time.time() - start))
# ⇒ 1: 1.0047399997711182 sec
# 9: 3.0104171924591064 sec
# 25: 5.013734579086304 sec
# 並列処理が終了したら閉じる
p.close()
p.join()
・複数引数を指定したい場合はラッパー関数を使用する。
.map()
や.imap()
は引数を1つしか指定できないため、複数引数を指定したい場合はラッパー関数を作成する必要がある。
この時zip()
関数が有用。
zip()
:複数のリストのそれぞれの要素をindex毎にタプルにして返すイテレーターを作成。
# 複数の引数を入れる場合はラッパー関数を作る
# 2つの引数をとる関数
def multiply(a, b):
return a * b
params1 = [1, 2, 3, 4]
params2 = [10, 30, 70, 20]
# ラッパー関数を作る
def wrap_multiply(args):
# *はunpack演算子
return multiply(*args)
# zipでタプルのリストを作成 (イテレータを返す)
job_args = list(zip(params1, params2))
# 並列処理
p = Pool(processes=cpu_count()-1)
results = p.imap(wrap_multiply, job_args)
p.close()
p.join()
list(results) # ⇒ [10, 60, 210, 80]
▶別ファイルのPythonスクリプトをJupyterLabで呼び出す
今まではすべてJupyterLabに直書きしてきたが、基本的にはPythonスクリプトを別ファイルで定義し、それをJupyterLabで呼び出す。
その時、Pythonを編集したときの変更を自動的に反映してくれる設定をすると楽
%load_ext autoreload
と%autoreload 2
を記述するだけでOK。
def multiply(a, b):
return a * b
# 変更したモジュールが反映されるようにする
# 'autoreload'というextensionをloadする
%load_ext autoreload
# スクリプトを実行するたびに毎回reloadするように設定
%autoreload 2
import util
print(util.multiply(3, 4)) # ⇒ 12