estie Advent Calendar 2019 14日目の記事になります。
是非他の記事もご覧くださいね
#はじめに
こんにちは、estie.incでエンジニアやってます、marushoです。
estieでは「テクノロジーの力で、世界を自由に、楽しく。」を合言葉に、不動産分野の「めんどくさい」を解消するため
を運営しており、日々更新される不動産データを分析し、新たな価値の創出にチャレンジしています。
データ分析や分析結果の反映をスピーディに行うためには、セキュリティ構成を担保しつつ気軽にDBへアクセスする必要があります。
弊社ではpandas<->DBのやりとりが頻繁に行われるのですが、一度csvファイルに変換したり踏み台サーバに入ったりするのは何かと時間を消費してしまいます。
ということで、
今回は踏み台経由でDBのデータを、pandasで基本的なCRUD操作を直接やってみます。
#環境
DBはprivateなサブネットにいて踏み台サーバを経由しないとアクセスできない、というよくある環境を前提とします。
今回はAWS上のEC2/RDS(MySQL5.7)で動作させています。
ちなみにローカル環境は
- MacOS Mojave
- Python 3.6.8
- Pandas 0.24.1
##必要なパッケージをインストール
PythonでDB情報を扱うので、定番ORMのSQLAlchemyを使います。
また、MySQLのドライバと踏み台にSSHを張るためのSSHtunnelもインストールします
$ pip install SQLAlchemy PyMySQL sshtunnel
##SSH config
普段sshに接続するために、.ssh/configにHostを登録しておくことが多いと思います。
今回もsshtunnelでconfigに書かれたHost情報を利用するので、以下のように踏み台の接続情報を書き込んでおきます。
Host rds_bastion
Hostname [踏み台IP]
Port 22
User [UserName]
IdentityFile ~/.ssh/[KeyName]
#RDSに接続
まずはmoduleのimportと、DBの接続に必要な情報を書いておきます
import pandas as pd
import sqlalchemy as sa
from sshtunnel import SSHTunnelForwarder
DB_USER = 'test_user' # DBのユーザー名
DB_PASS = 'db_passward' # DBのパスワード
ENDPOINT = 'hogehoge.fugafuga.ap-northeast-1.rds.amazonaws.com' # RDSエンドポイント
PORT = 3306 # ポート
DB_NAME = 'test_db' # DB名
CHARSET = 'utf8' # 文字コード
次にSSHポートフォワードを使って、踏み台越しのDBに接続します。
server = SSHTunnelForwarder(ssh_address_or_host = 'rds_bastion',
ssh_config_file = '~/.ssh/config',
remote_bind_address=(ENDPOINT,PORT))
server.start()
接続を終了するときはcloseしましょう
server.close()
sshを接続した状態で、SQLAlqhemyのエンジンを取得します。
#SQLAlchemyの接続URLを生成
URL = f"mysql+pymysql://{DB_USER}:{DB_PASS}@127.0.0.1:{server.local_bind_port}/{DB_NAME}?charset={CHARSET}"
#engineの取得
engine = sa.create_engine(URL)
このengineを使ってPandasでのデータ操作をやっていきます
Pandasでごにょごにょする
さて、本題です。
pandasでcreate,read,update,delete操作ができるか試してみましょう。
サンプルとして、DB名test_db
にmembersテーブルを作成しておきます
MySQL [test_db]> SELECT * FROM members;
+----+------------------+-----+
| id | name | age |
+----+------------------+-----+
| 1 | 雪村 あおい | 15 |
| 2 | 倉上 ひなた | 15 |
| 3 | 斎藤 楓 | 16 |
| 4 | 青羽 ここな | 13 |
+----+------------------+-----+
##Read:読み込み
まずはpandas.read_sql
を使ってmembersテーブルをDataFrameとして読み込んでみましょう
テーブル全てのデータを読み込む場合は、テーブル名を指定します
df = pd.read_sql('members', engine)
id | name | age | |
---|---|---|---|
0 | 1 | 雪村 あおい | 15 |
1 | 2 | 倉上 ひなた | 15 |
2 | 3 | 斎藤 楓 | 16 |
3 | 4 | 青羽 ここな | 13 |
綺麗に読み込めてますね
indexカラムの指定や、取得したいカラム名をリスト指定することもできます。
df= pd.read_sql('members', engine, index_col='id', columns=['name'])
id | name |
---|---|
1 | 雪村 あおい |
2 | 倉上 ひなた |
3 | 斎藤 楓 |
4 | 青羽 ここな |
もちろんSQLクエリでレコード指定することも可能です。
df= pd.read_sql('SELECT * FROM members WHERE id = 2', engine)
id | name | age | |
---|---|---|---|
1 | 2 | 倉上 ひなた | 15 |
##Create:テーブル作成
to_sql
を使ってDataFrameのデータから新しいテーブルを作成できます。
(DataFarameの)indexの有無や、どれをindexとして取り込むかの指定もできます。
df = pd.read_sql('SELECT * FROM members WHERE age < 14', engine)
df.to_sql('jc_members', engine, index=False, index_label='id')
MySQL [test_db]> select * from jc_members;
+------+------------------+------+
| id | name | age |
+------+------------------+------+
| 4 | 青羽 ここな | 13 |
+------+------------------+------+
##Insert:レコードの挿入/更新
こちらもto_sql
で実行できますが、
if_exist
オプションで挙動が異なるので注意が必要です。
if_exist=append
とすると、新しいレコードとして追加し、同じレコードがあった場合はエラーになります。
insert_df = pd.DataFrame({'id':['5'],'name' : ['黒崎 ほのか'],'age':['14']})
insert_df.to_sql('members', engine, index=False, index_label='id', if_exists='append')
id | name | age |
---|---|---|
1 | 雪村 あおい | 15 |
2 | 倉上 ひなた | 15 |
3 | 斎藤 楓 | 16 |
4 | 青羽 ここな | 13 |
5 | 黒崎 ほのか | 14 |
INSERTとおなじ挙動ですね。ちゃんと追加されています。
しかしif_exist=replace
とすると、指定テーブルのデータをすぺてdeleteして、DataFrameを追加します。
insert_df = pd.DataFrame({'id':['5'],'name' : ['黒崎 ほのか'],'age':['14']})
insert_df.to_sql('members', engine, index=False, index_label='id', if_exists='replace')
id | name | age |
---|---|---|
5 | 黒崎 ほのか | 14 |
UPDATEでもUPSERTでもなく、はたまたREPLACEとも異なる挙動なので注意が必要です!
特定レコードだけ更新する、などの操作はまだto_sqlに実装されいないようです。
今回は割愛しますが、SQLAlchemyのupsertを使う方法や、to_sql
のmethodオプションでSQLの挙動を変更するやり方があるようなので、試してみようと思います。
##Delete:レコード/テーブルの削除
read_sql
でdrop/delete操作をするとreturnが無くエラーになるのですが、
実はDB側には削除操作が実行されてしまいます。
pd.read_sql('DROP TABLE members', engine)
MySQL [test_db]> SELECT * FROM members;
ERROR 1146 (42S02): Table 'test_db.members' doesn't exist
これは本来の用途ではないので、delete操作を行うときは素直にsqlalchemyでのクエリ実行をお勧めします
engine.execute('DROP TABLE members')
#おわりに
離れたDBの情報を手軽にDataFrameにできるのは魅力ですね。
更新系のメソッドはかゆいところに手が届いてない感じなので、今後のpandasの発展を注視したいと思います。
estieではWebエンジニアを募集しています!
Wantedly
お気軽にオフィスに遊びに来てくださいね!