LoginSignup
0
0

More than 3 years have passed since last update.

sqlalchemyで暗号化した列を参照する

Last updated at Posted at 2020-12-28

概要

MySQLには列を暗号化するためにaes_encrypt関数とaes_decrypt関数がある。SQL文でこれらの関数を利用する例は多くあるが、sqlalchemyを利用した例は少ない。本記事は暗号化した列をsqlalchemyを利用して参照する例を挙げる。

MS Windowsの場合

MS Windowsを利用している場合は次のようにVARBINARYで暗号化したデータを扱うことができた。しかし、Ubuntuの場合はエラーになりましたのでこの後に説明するように暗号化列もVARCHAR型にしてhex(aes_encrypt())を利用する。

利用するテーブル

名前:server_login_info

No. 列名 PK NN UQ AI コメント
1. id INT(11) 主キー
2. name VARCHAR(20)
3. user VARBINARY(25) ログインユーザ名(暗号化)
4. password VARBINARY(30) ログインパスワード(暗号化)

SQL文

テーブルにサンプルデータを追加する

SET @key_str =unhex(sha2('python',512));

INSERT INTO server_login_info (name, user,password) VALUES ('servicenow', AES_ENCRYPT('ozawa', @key_str),AES_ENCRYPT('password1', @key_str));
INSERT INTO server_login_info (name, user,password) VALUES ('servicehub', AES_ENCRYPT('luffy', @key_str),AES_ENCRYPT('goingmerry', @key_str));
INSERT INTO server_login_info (name, user,password) VALUES ('madokamagica', AES_ENCRYPT('homura', @key_str),AES_ENCRYPT('entropy', @key_str));

行で登録されたことを確認する。

SELECT name, aes_decrypt(user, @key_str) as user, aes_decrypt(password,@key_str) as password FROM server_login_info;

実行結果:

mysql> SELECT name, aes_decrypt(user, @key_str) as user, aes_decrypt(password,@key_str) as password FROM server_login_info;
+--------------+--------+------------+
| name         | user   | password   |
+--------------+--------+------------+
| madokamagica | homura | entropy    |
| servicenow   | ozawa  | password1  |
| servicehub   | luffy  | goingmerry |
+--------------+--------+------------+
3 rows in set (0.00 sec)

sqlalchemyから参照する

先ずも出るを定義する。ポイントは解読したフィールを別に定義することである。
models.py

from sqlalchemy import func, cast
from sqlalchemy.dialects.mysql import CHAR
from sqlalchemy.orm import column_property

class ServerLoginInfo(db.Model):
    __tablename__ = 'server_login_info'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(20), unique=True)
    user = db.Column(db.VARBINARY(25))  # 暗号化されたユーザ名
    password = db.Column(db.VARBINARY(30)) # 暗号化されたパスワード

    user_name = column_property(
        cast(
            func.aes_decrypt(
                user, func.unhex(func.sha2('python', 512))
            ),
            CHAR(charset='utf8mb4'),
        )
    )
    user_password = column_property(
        cast(
            func.aes_decrypt(
                password, func.unhex(func.sha2('python', 512))
            ),
            CHAR(charset='utf8mb4'),
        )
    )

    def __init__(self, name, user, password):
        self.name = name
        self.user = user
        self.password = password

    def __repr__(self):
        return f"pc_info('{self.id}', '{self.name}', '{self.user_name}', " \
               f" '{self.user_password}')"

database.py

from sample.models import ServerLoginInfo

def get_login_info(server_name):
    login_info_dict = {}
    server_login_info = db.session.query(ServerLoginInfo).filter_by(name=server_name).first()

    if server_login_info is None:
        print(f'エラー。指定したサーバが見つかりません。サーバ名: {server_name}')
        return
    if server_login_info.user is not None:
        login_info_dict['user'] = server_login_info.user_name
        login_info_dict['password'] = server_login_info.user_password
    return login_info_dict


if __name__ == '__main__':
    login_info = get_login_info('madokamagica')
    print(login_info)

実行結果:

C:/Users/ozawa/sample/database_api.py
{'user': 'homura', 'password': 'entropy'}

Process finished with exit code 0

Ubuntuの場合

利用するテーブル

名前:server_login_info

No. 列名 PK NN UQ AI コメント
1. id INT(11) 主キー
2. name VARCHAR(20)
3. user VARCHAR(25) ログインユーザ名(暗号化)
4. password VARCHAR(30) ログインパスワード(暗号化)

SQL文

テーブルにサンプルデータを追加する

SET @key_str =unhex(sha2('python',512));

INSERT INTO server_login_info (name, user,password) VALUES ('servicenow', hex(AES_ENCRYPT('ozawa', @key_str)), hex(AES_ENCRYPT('password1', @key_str)));
INSERT INTO server_login_info (name, user,password) VALUES ('servicehub', hex(AES_ENCRYPT('luffy', @key_str)) , hex(AES_ENCRYPT('goingmerry', @key_str)));
INSERT INTO server_login_info (name, user,password) VALUES ('madokamagica', hex(AES_ENCRYPT('homura', @key_str)), hex(AES_ENCRYPT('entropy', @key_str)));

行で登録されたことを確認する。

SELECT name, aes_decrypt(unhex(user), @key_str) as user, aes_decrypt(unhex(password),@key_str) as password FROM server_login_info;

実行結果:

mysql> SELECT name, aes_decrypt(unhex(user), @key_str) as user, aes_decrypt(unhex(password), @key_str) as password FROM server_login_info;
+--------------+--------+------------+
| name         | user   | password   |
+--------------+--------+------------+
| madokamagica | homura | entropy    |
| servicenow   | ozawa  | password1  |
| servicehub   | luffy  | goingmerry |
+--------------+--------+------------+
3 rows in set (0.00 sec)

sqlalchemyから参照する

先ずも出るを定義する。ポイントは解読したフィールを別に定義することである。
models.py

from sqlalchemy import func, cast
from sqlalchemy.dialects.mysql import CHAR
from sqlalchemy.orm import column_property

class ServerLoginInfo(db.Model):
    __tablename__ = 'server_login_info'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(20), unique=True)
    user = db.Column(db.String(25))  # 暗号化されたユーザ名
    password = db.Column(db.String(30)) # 暗号化されたパスワード

    user_name = column_property(
        cast(
            func.aes_decrypt(
                func.unhex(user), func.unhex(func.sha2('python', 512))
            ),
            CHAR(charset='utf8mb4'),
        )
    )
    user_password = column_property(
        cast(
            func.aes_decrypt(
                func.unhex(password), func.unhex(func.sha2('python', 512))
            ),
            CHAR(charset='utf8mb4'),
        )
    )

    def __init__(self, name, user, password):
        self.name = name
        self.user = user
        self.password = password

    def __repr__(self):
        return f"pc_info('{self.id}', '{self.name}', '{self.user_name}', " \
               f" '{self.user_password}')"

database.py

from sample.models import ServerLoginInfo

def get_login_info(server_name):
    login_info_dict = {}
    server_login_info = db.session.query(ServerLoginInfo).filter_by(name=server_name).first()

    if server_login_info is None:
        print(f'エラー。指定したサーバが見つかりません。サーバ名: {server_name}')
        return
    if server_login_info.user is not None:
        login_info_dict['user'] = server_login_info.user_name
        login_info_dict['password'] = server_login_info.user_password
    return login_info_dict


if __name__ == '__main__':
    login_info = get_login_info('madokamagica')
    print(login_info)

以上

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0