5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

PythonでSSH経由し外部のMySQLデータベースを操作してみる

Posted at

ローカルで外部のデータベースを操作したい時に、いつもSequel proなどのツールでSQL文を実行するようにしていますが、動的にSQL文を作成したいや結果確認も一気にやりたい時になると、少し不便かなーと感じています。
Pythonを使ってみたらどうかなと思い、かつ最近もJupyterを試してみたいので、今回はJupyterを使って、SSH経由でMySQLデータベースを操作してみまして、その方法をメモしました。

やりたいこと

  • 踏み台(bastion)経由してMySQL DBに接続する
  • 更新対象のレコードを抽出し、更新SQL文を動的に作成する
  • データ更新を行い、進捗を表示する
  • 確認しやすくため、更新されたレコードの更新前の情報を保持する

環境

今回は、
PC:MacBook pro
OS:macOS 10.15.2
Jupyter(python3)で試してみました。

実装

流れとしては

  • 必要なmoduleを用意
  • 検索関数、更新関数、進捗表示関数の作成
  • 各関数の実行
  • 結果の表示

必要なmoduleを用意

まず、必要なmoduleをインポートします。

from sshtunnel import SSHTunnelForwarder
import pymysql as db
import pandas as pd
import datetime

SSHで踏み台サーバーとデータベースサーバーにアクセスするため、SSHTunnelForwarderをインポートします。
MySQLを操作するため、pymysqlをインポートします。
DataFrame形式でデータを操作したいため、pandasをインポートします。
操作時間を計算するため、datetimeをインポートします。

データベースサーバーの接続と検索関数の作成

踏み台経由でデータベースサーバーに接続して、検索SQLを実行する

# ssh
sshOptions = {
    “bastion”: {
        “host”: “host”,
        “ssh_username”: “user”,
        “ssh_private_key”: “~/develop/bastion.pem”,
        “ssh_password”: “password”
    }
}
ssh = sshOptions[“bastion”]
# database
dbOptions = {
    “mysql”: {
        “localhost”: “host”,
        “user”: “user”,
        “password”: “password”,
        “database”: “db”,
        “port”: 3306
    }
}
dbConfig = dbOptions[“mysql”]
def query(q):
    with SSHTunnelForwarder(
    (ssh[“host”], 22),
    ssh_username = ssh[“ssh_username”],
    ssh_password = ssh[“ssh_password”],
    ssh_private_key = ssh[“ssh_private_key”],
    remote_bind_address=(dbConfig[“localhost”], dbConfig[“port”])
    ) as server:
        conn = db.connect(host = ‘127.0.0.1’,
                          port = server.local_bind_port,
                          user = dbConfig[“user”],
                          passwd = dbConfig[“password”],
                          db = dbConfig[“database”],
                          charset = ‘utf8’,
                          cursorclass = db.cursors.DictCursor)
        df = pd.read_sql_query(q, conn)
        conn.close()
        return df

まず、sshOptions、dbOptionsには、踏み台サーバーのIP、ポート、SSH関する設定とデーターベースサーバーのIP、ポートを保持します。

SSHで踏み台サーバーに接続する部分は、SSHTunnelForwarderで処理します。
remote_bind_addressにデーターベースのIPとポートを設定します。
(踏み台を使わないままで直接データーベースに接続したい場合、踏み台の部分をデータベースサーバーの情報に書き換えて、remote_bind_addressの1番目の引数を127.0.0.1にすればOK)

あとは、データを検索する関数query()を作成します。
検索SQL文を引数として関数に渡し、検索を成功に実行したらDataFrame形式の結果を返してくれます。

進捗表示関数の作成

大量なデータ更新を実行している時に進捗を確認したいですので、進捗を表示する関数を作成します。

def log_progress(sequence, every=None, size=None, name=‘Items’):
    from ipywidgets import IntProgress, HTML, VBox
    from IPython.display import display
    is_iterator = False
    if size is None:
        try:
            size = len(sequence)
        except TypeError:
            is_iterator = True
    if size is not None:
        if every is None:
            if size <= 200:
                every = 1
            else:
                every = int(size / 200)     # every 0.5%
    else:
        assert every is not None, ‘sequence is iterator, set every’
    if is_iterator:
        progress = IntProgress(min=0, max=1, value=1)
        progress.bar_style = ‘info’
    else:
        progress = IntProgress(min=0, max=size, value=0)
    label = HTML()
    box = VBox(children=[label, progress])
    display(box)
    index = 0
    try:
        for index, record in enumerate(sequence, 1):
            if index == 1 or index % every == 0:
                if is_iterator:
                    label.value = ‘{name}: {index} / ?‘.format(
                        name=name,
                        index=index
                    )
                else:
                    progress.value = index
                    label.value = u’{name}: {index} / {size}’.format(
                        name=name,
                        index=index,
                        size=size
                    )
            yield record
    except:
        progress.bar_style = ‘danger’
        raise
    else:
        progress.bar_style = ‘success’
        progress.value = index
        label.value = “{name}: {index}“.format(
            name=name,
            index=str(index or ‘?’)
        )

使う例

for url in log_progress(urls[:10], every=1):
    # ループで実行したい処理
    # for example: print(url)

更新処理関数の作成

ここは、複数データを更新する想定です。
接続の処理はquery関数と同じです。

def update(targets):
    with SSHTunnelForwarder(
    (ssh[“host”], 22),
    ssh_username = ssh[“ssh_username”],
    ssh_password = ‘ssh[“ssh_password”]’,
    ssh_private_key = ssh[“ssh_private_key”],
    remote_bind_address=(dbConfig[“localhost”], dbConfig[“port”])
    ) as server:
        conn = db.connect(host = ‘127.0.0.1’,
                          port = server.local_bind_port,
                          user = dbConfig[“user”],
                          passwd = dbConfig[“password”],
                          db = dbConfig[“database”],
                          charset = ‘utf8’,
                          cursorclass = db.cursors.DictCursor)
        cursor = conn.cursor()
        record = []
        for index in log_progress(targets.index, every=1):
            data = targets.loc[index]
            sql2 = “update hoge set value = 0, updated_date=NOW() where id = %d”
            cursor.execute(sql2 % (data[‘id’]))
            if (cursor.rowcount == 1):
                record.append(‘%d, %d, %s’ % (data[‘id’] , data[‘value’], data[‘updated_date’]))
            conn.commit()
        conn.close()
        return record

更新対象を関数の引数として渡し、ループで更新のSQL文を作成して実行します。
ここは単にレコードのvalueとupdated_atの更新になります。

更新を成功した場合、変更前のデータを保存する。
また、更新実行中に進捗を表示したいため、log_progress関数を使用します。

対象レコードを抽出して更新を実行する

ここまで関数作成は完了なりますので、実際に例を実行してみます。

sql = “select id, value, updated_date \
from hoge \
where \
and value < 0 ;”
df = query(sql)
startTime = datetime.datetime.now()
print(startTime, ‘-----‘, ‘’, sep=‘\n’)
re = update(df)
endTime = datetime.datetime.now()
print(‘------‘, endTime, ‘’, sep=‘\n’)
print(‘Time:’, endTime - startTime)
print(‘the data before changing’, ‘;\n’.join(re), sep=‘\n’)

まず、先ほど作成したquery関数で更新したいレコードを抽出します。
ここは、value<0のレコードを抽出します。

その後、抽出したレコードをupdate関数に渡して、更新を実行します。

最後に、実行時間、変更されたデータの変更前の情報を表示します。

これで今回やりたいことを達成しました。

5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?