こんなケースに
例えば、
下記のようなemployee情報が含まれたCSVをDBに取り込む
従業員氏名,生年月日,会社コード,会社名
tanaka tarou,2000-12-20,aaa,aaa会社
tanaka jirou,1970-08-05,bbb,bbb会社
suzuki tarou,1987-10-15,aaa,aaa会社
suzuki jirou,1975-01-31,ccc,ccc会社
///以下一万行以上
DBには
下記のようなサロゲートキーを外部キーとしたリレーションを持つemployees,companiesテーブルが存在する。
employeesテーブル
従業員情報を格納するテーブル
列名 | 型 | 備考 |
---|---|---|
id | int | オートインクリメントID |
name | string | 従業員氏名 |
birth_date | date | 生年月日 |
company_id | int | 会社ID 外部キーとしてcopaniesテーブルのidを参照 |
companiesテーブル
会社情報のマスター
元々1万件のデータが存在する
列名 | 型 | 備考 |
---|---|---|
id | int | オートインクリメントID |
code | string | 会社コード 一意キー制約 |
name | string | 会社名 |
データベースの整合性を保つために、
会社コードで会社IDを引き当てて、
employeesテーブルに保存しないといけない。
元々こうやっていた
-
一回一回DBにアクセスして取ってくる
当然一回一回SQL文発行するのでその分大幅に時間がかかります。
サーバーがIDCFクラウドの一番安い構成の貧弱スペックのため、
数万行のデータに一回一回やっていたら2日かかるんじゃないかってぐらい遅かったので
次の方法に。 -
キャッシュに取得しておいて、都度全要素を比較
company = [company for company in companies if company['code']=='aaa'][0]
DBから取得したデータをリストで持っておき、for文で全要素を回して取得。
companyが一万行ぐらいだと一行一行SQL文発行するのと同じくらい時間がかかった。
最終的にcsvのデータをuniquekeyで昇順に並べ替えて、DBからuniquekeyを昇順で200ずつ取ってきて、比較、200件比較し終えたら次の200件・・・・みたいにしていた。
それでも5分以上かかっている。
なんて無駄なことをしていたんだろう。
dictionaryを使う
一意キーをkeyにしたdictionaryに変換して値を取得する
from operator import itemgetter
import csv
import dataset
def get_last_id(dic):
# companiesをid降順に並び変えて一番目の要素のidを返す
last_item = sorted(dic.values(),key=itemgetter('id'),reverse=True)[0]
return last_item['id']
# DBからcompanyテーブル全データをdict型で取得する
# dataset等を使うとテーブルデータを辞書型で取得できる
db = dataset.Database('connectionurl')
companies_tbl = db['companies']
companies = companies_tbl.all()
# 一意キーであるcodeをkeyとしてdictionaryを作成
companies_dict = dict(map(lambda company:(company['code'],company),companies))
# emploee.csvを読み込む
csv_file = open('emploee.csv','r',encoding='utf8',newline='')
reader = csv.reader(csv_file , delimiter=',',lineterminator="\r\n",skipinitialspace=True)
employees=[]
for field in reader:
# companyを比較する
company = companies_dict.setdefault(field[2],{'code':field[2],'name':field[3],'id':(get_last_id(companies_dict)+1)})
print(company)
employees.append({'name':field[0],'birth_date':field[1],'company_id':company['id']})
# companiesへ登録
for company in companies_dict.values():
companies_tbl.upsert(company,['id','code'])
# employeesへ登録
employees_tbl = db['employees']
for employee in employees:
employees_tbl.insert(employee)
これだけ。
3万件弱取り込むのに5分くらいかかっていたのが5秒20秒くらいで終わった。
※setdefaultの場合は20秒ぐらい、5秒になったのは下記にちょろっと書いたKeyErrorを使用した時
実際に使うときはmax_idのインクリメントを考慮に入れて、
追加する都度dictionary中の最大のIDを取ってくるようにする必要があります。
※安全性を考慮すれば上記のようにdict.setdefault関数を使うべき。
しかしあえてKeyErrorを発生させて例外処理ステートメントで新しく要素を追加する方法だと自分の環境ではさらに速くなった。また別記事で書く。
※DBプロバイダーがmysqlの場合、デフォルトで大文字小文字を区別しないので、例えば
AAA
,aaa
のような会社コードがあった場合、dicitionaryでは別のキーとして登録されるが、
mysqlでは同じものとして認識するため、DB登録時に不整合が発生する。
DB側の定義を変えるか、大文字小文字を区別しないdictionaryを使う
#まとめ
比較する側に一意キーがないといけないので
あまり使用する場面はないかもしれないけど、データを大量にとってきて比較したい場合に役に立ちそうだと思ったのでシェアです。
取り込みだけじゃなく、手動でのデータ入力で新規登録する時に整合性を保つためにも使えるかもしれません。
この方法を発見したことで、
今までわかってなかったKVSの考え方をなんとなくつかめた気がします。