5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

estieAdvent Calendar 2019

Day 14

踏み台経由RDSのデータをPandasでごにょごにょする

Last updated at Posted at 2019-12-14

estie Advent Calendar 2019 14日目の記事になります。
是非他の記事もご覧くださいね :laughing:

#はじめに
こんにちは、estie.incでエンジニアやってます、marushoです。
estieでは「テクノロジーの力で、世界を自由に、楽しく。」を合言葉に、不動産分野の「めんどくさい」を解消するため

  • オフィス探しサービス estie
  • 不動産データの可視化サービス estiepro

を運営しており、日々更新される不動産データを分析し、新たな価値の創出にチャレンジしています。

データ分析や分析結果の反映をスピーディに行うためには、セキュリティ構成を担保しつつ気軽にDBへアクセスする必要があります。
弊社ではpandas<->DBのやりとりが頻繁に行われるのですが、一度csvファイルに変換したり踏み台サーバに入ったりするのは何かと時間を消費してしまいます。

ということで、
今回は踏み台経由でDBのデータを、pandasで基本的なCRUD操作を直接やってみます。

#環境
DBはprivateなサブネットにいて踏み台サーバを経由しないとアクセスできない、というよくある環境を前提とします。
今回はAWS上のEC2/RDS(MySQL5.7)で動作させています。

ちなみにローカル環境は

  • MacOS Mojave
  • Python 3.6.8
  • Pandas 0.24.1

##必要なパッケージをインストール
PythonでDB情報を扱うので、定番ORMのSQLAlchemyを使います。
また、MySQLのドライバと踏み台にSSHを張るためのSSHtunnelもインストールします

$ pip install SQLAlchemy PyMySQL sshtunnel

##SSH config
普段sshに接続するために、.ssh/configにHostを登録しておくことが多いと思います。
今回もsshtunnelでconfigに書かれたHost情報を利用するので、以下のように踏み台の接続情報を書き込んでおきます。

~/.ssh/config
Host rds_bastion
    Hostname [踏み台IP]
    Port 22
    User [UserName]
    IdentityFile ~/.ssh/[KeyName]

#RDSに接続

まずはmoduleのimportと、DBの接続に必要な情報を書いておきます


import pandas as pd
import sqlalchemy as sa
from sshtunnel import SSHTunnelForwarder

DB_USER = 'test_user' # DBのユーザー名
DB_PASS =  'db_passward' # DBのパスワード
ENDPOINT = 'hogehoge.fugafuga.ap-northeast-1.rds.amazonaws.com' # RDSエンドポイント
PORT = 3306 # ポート
DB_NAME = 'test_db' # DB名
CHARSET = 'utf8' # 文字コード

次にSSHポートフォワードを使って、踏み台越しのDBに接続します。


server = SSHTunnelForwarder(ssh_address_or_host = 'rds_bastion',
                            ssh_config_file = '~/.ssh/config',
                            remote_bind_address=(ENDPOINT,PORT))
server.start()

接続を終了するときはcloseしましょう


server.close()

sshを接続した状態で、SQLAlqhemyのエンジンを取得します。


#SQLAlchemyの接続URLを生成
URL = f"mysql+pymysql://{DB_USER}:{DB_PASS}@127.0.0.1:{server.local_bind_port}/{DB_NAME}?charset={CHARSET}"

#engineの取得
engine = sa.create_engine(URL)

このengineを使ってPandasでのデータ操作をやっていきます

Pandasでごにょごにょする

さて、本題です。
pandasでcreate,read,update,delete操作ができるか試してみましょう。

サンプルとして、DB名test_dbにmembersテーブルを作成しておきます

MySQL [test_db]> SELECT * FROM members;
+----+------------------+-----+
| id | name             | age |
+----+------------------+-----+
|  1 | 雪村 あおい       | 15  |
|  2 | 倉上 ひなた       | 15  |
|  3 | 斎藤 楓          | 16  |
|  4 | 青羽 ここな       | 13  |
+----+------------------+-----+

##Read:読み込み
まずはpandas.read_sqlを使ってmembersテーブルをDataFrameとして読み込んでみましょう

テーブル全てのデータを読み込む場合は、テーブル名を指定します

df = pd.read_sql('members', engine)
id name age
0 1 雪村 あおい 15
1 2 倉上 ひなた 15
2 3 斎藤 楓 16
3 4 青羽 ここな 13

綺麗に読み込めてますね

indexカラムの指定や、取得したいカラム名をリスト指定することもできます。

df= pd.read_sql('members', engine, index_col='id', columns=['name'])
id name
1 雪村 あおい
2 倉上 ひなた
3 斎藤 楓
4 青羽 ここな

もちろんSQLクエリでレコード指定することも可能です。

df= pd.read_sql('SELECT * FROM members WHERE id = 2', engine)
id name age
1 2 倉上 ひなた 15

##Create:テーブル作成
to_sqlを使ってDataFrameのデータから新しいテーブルを作成できます。
(DataFarameの)indexの有無や、どれをindexとして取り込むかの指定もできます。


df = pd.read_sql('SELECT * FROM members WHERE age < 14', engine)
df.to_sql('jc_members', engine, index=False, index_label='id')
MySQL [test_db]> select * from jc_members;
+------+------------------+------+
| id   | name             | age  |
+------+------------------+------+
|    4 | 青羽 ここな      | 13   |
+------+------------------+------+

##Insert:レコードの挿入/更新
こちらもto_sqlで実行できますが、
if_existオプションで挙動が異なるので注意が必要です。

if_exist=appendとすると、新しいレコードとして追加し、同じレコードがあった場合はエラーになります。


insert_df = pd.DataFrame({'id':['5'],'name' : ['黒崎 ほのか'],'age':['14']})
insert_df.to_sql('members', engine, index=False, index_label='id', if_exists='append')
id name age
1 雪村 あおい 15
2 倉上 ひなた 15
3 斎藤 楓 16
4 青羽 ここな 13
5 黒崎 ほのか 14

INSERTとおなじ挙動ですね。ちゃんと追加されています。

しかしif_exist=replaceとすると、指定テーブルのデータをすぺてdeleteして、DataFrameを追加します。


insert_df = pd.DataFrame({'id':['5'],'name' : ['黒崎 ほのか'],'age':['14']})
insert_df.to_sql('members', engine, index=False, index_label='id', if_exists='replace')
id name age
5 黒崎 ほのか 14

UPDATEでもUPSERTでもなく、はたまたREPLACEとも異なる挙動なので注意が必要です!

特定レコードだけ更新する、などの操作はまだto_sqlに実装されいないようです。
今回は割愛しますが、SQLAlchemyのupsertを使う方法や、to_sqlmethodオプションでSQLの挙動を変更するやり方があるようなので、試してみようと思います。

##Delete:レコード/テーブルの削除
read_sqlでdrop/delete操作をするとreturnが無くエラーになるのですが、
実はDB側には削除操作が実行されてしまいます。

pd.read_sql('DROP TABLE members', engine)
MySQL [test_db]> SELECT * FROM members;
ERROR 1146 (42S02): Table 'test_db.members' doesn't exist

これは本来の用途ではないので、delete操作を行うときは素直にsqlalchemyでのクエリ実行をお勧めします

engine.execute('DROP TABLE members')

#おわりに
離れたDBの情報を手軽にDataFrameにできるのは魅力ですね。
更新系のメソッドはかゆいところに手が届いてない感じなので、今後のpandasの発展を注視したいと思います。


estieではWebエンジニアを募集しています!
Wantedly
お気軽にオフィスに遊びに来てくださいね!

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?