#概要
MySQLには列を暗号化するためにaes_encrypt関数とaes_decrypt関数がある。SQL文でこれらの関数を利用する例は多くあるが、sqlalchemyを利用した例は少ない。本記事は暗号化した列をsqlalchemyを利用して参照する例を挙げる。
#MS Windowsの場合
MS Windowsを利用している場合は次のようにVARBINARYで暗号化したデータを扱うことができた。しかし、Ubuntuの場合はエラーになりましたのでこの後に説明するように暗号化列もVARCHAR型にしてhex(aes_encrypt())を利用する。
##利用するテーブル
名前:server_login_info
No. | 列名 | 型 | PK | NN | UQ | AI | コメント |
---|---|---|---|---|---|---|---|
1. | id | INT(11) | ☑ | ☑ | ☑ | 主キー | |
2. | name | VARCHAR(20) | ☑ | ☑ | |||
3. | user | VARBINARY(25) | ログインユーザ名(暗号化) | ||||
4. | password | VARBINARY(30) | ログインパスワード(暗号化) |
##SQL文
テーブルにサンプルデータを追加する
SET @key_str =unhex(sha2('python',512));
INSERT INTO server_login_info (name, user,password) VALUES ('servicenow', AES_ENCRYPT('ozawa', @key_str),AES_ENCRYPT('password1', @key_str));
INSERT INTO server_login_info (name, user,password) VALUES ('servicehub', AES_ENCRYPT('luffy', @key_str),AES_ENCRYPT('goingmerry', @key_str));
INSERT INTO server_login_info (name, user,password) VALUES ('madokamagica', AES_ENCRYPT('homura', @key_str),AES_ENCRYPT('entropy', @key_str));
行で登録されたことを確認する。
SELECT name, aes_decrypt(user, @key_str) as user, aes_decrypt(password,@key_str) as password FROM server_login_info;
実行結果:
mysql> SELECT name, aes_decrypt(user, @key_str) as user, aes_decrypt(password,@key_str) as password FROM server_login_info;
+--------------+--------+------------+
| name | user | password |
+--------------+--------+------------+
| madokamagica | homura | entropy |
| servicenow | ozawa | password1 |
| servicehub | luffy | goingmerry |
+--------------+--------+------------+
3 rows in set (0.00 sec)
##sqlalchemyから参照する
先ずも出るを定義する。ポイントは解読したフィールを別に定義することである。
models.py
from sqlalchemy import func, cast
from sqlalchemy.dialects.mysql import CHAR
from sqlalchemy.orm import column_property
class ServerLoginInfo(db.Model):
__tablename__ = 'server_login_info'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(20), unique=True)
user = db.Column(db.VARBINARY(25)) # 暗号化されたユーザ名
password = db.Column(db.VARBINARY(30)) # 暗号化されたパスワード
user_name = column_property(
cast(
func.aes_decrypt(
user, func.unhex(func.sha2('python', 512))
),
CHAR(charset='utf8mb4'),
)
)
user_password = column_property(
cast(
func.aes_decrypt(
password, func.unhex(func.sha2('python', 512))
),
CHAR(charset='utf8mb4'),
)
)
def __init__(self, name, user, password):
self.name = name
self.user = user
self.password = password
def __repr__(self):
return f"pc_info('{self.id}', '{self.name}', '{self.user_name}', " \
f" '{self.user_password}')"
database.py
from sample.models import ServerLoginInfo
def get_login_info(server_name):
login_info_dict = {}
server_login_info = db.session.query(ServerLoginInfo).filter_by(name=server_name).first()
if server_login_info is None:
print(f'エラー。指定したサーバが見つかりません。サーバ名: {server_name}')
return
if server_login_info.user is not None:
login_info_dict['user'] = server_login_info.user_name
login_info_dict['password'] = server_login_info.user_password
return login_info_dict
if __name__ == '__main__':
login_info = get_login_info('madokamagica')
print(login_info)
実行結果:
C:/Users/ozawa/sample/database_api.py
{'user': 'homura', 'password': 'entropy'}
Process finished with exit code 0
#Ubuntuの場合
##利用するテーブル
名前:server_login_info
No. | 列名 | 型 | PK | NN | UQ | AI | コメント |
---|---|---|---|---|---|---|---|
1. | id | INT(11) | ☑ | ☑ | ☑ | 主キー | |
2. | name | VARCHAR(20) | ☑ | ☑ | |||
3. | user | VARCHAR(25) | ログインユーザ名(暗号化) | ||||
4. | password | VARCHAR(30) | ログインパスワード(暗号化) |
##SQL文
テーブルにサンプルデータを追加する
SET @key_str =unhex(sha2('python',512));
INSERT INTO server_login_info (name, user,password) VALUES ('servicenow', hex(AES_ENCRYPT('ozawa', @key_str)), hex(AES_ENCRYPT('password1', @key_str)));
INSERT INTO server_login_info (name, user,password) VALUES ('servicehub', hex(AES_ENCRYPT('luffy', @key_str)) , hex(AES_ENCRYPT('goingmerry', @key_str)));
INSERT INTO server_login_info (name, user,password) VALUES ('madokamagica', hex(AES_ENCRYPT('homura', @key_str)), hex(AES_ENCRYPT('entropy', @key_str)));
行で登録されたことを確認する。
SELECT name, aes_decrypt(unhex(user), @key_str) as user, aes_decrypt(unhex(password),@key_str) as password FROM server_login_info;
実行結果:
mysql> SELECT name, aes_decrypt(unhex(user), @key_str) as user, aes_decrypt(unhex(password), @key_str) as password FROM server_login_info;
+--------------+--------+------------+
| name | user | password |
+--------------+--------+------------+
| madokamagica | homura | entropy |
| servicenow | ozawa | password1 |
| servicehub | luffy | goingmerry |
+--------------+--------+------------+
3 rows in set (0.00 sec)
##sqlalchemyから参照する
先ずも出るを定義する。ポイントは解読したフィールを別に定義することである。
models.py
from sqlalchemy import func, cast
from sqlalchemy.dialects.mysql import CHAR
from sqlalchemy.orm import column_property
class ServerLoginInfo(db.Model):
__tablename__ = 'server_login_info'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(20), unique=True)
user = db.Column(db.String(25)) # 暗号化されたユーザ名
password = db.Column(db.String(30)) # 暗号化されたパスワード
user_name = column_property(
cast(
func.aes_decrypt(
func.unhex(user), func.unhex(func.sha2('python', 512))
),
CHAR(charset='utf8mb4'),
)
)
user_password = column_property(
cast(
func.aes_decrypt(
func.unhex(password), func.unhex(func.sha2('python', 512))
),
CHAR(charset='utf8mb4'),
)
)
def __init__(self, name, user, password):
self.name = name
self.user = user
self.password = password
def __repr__(self):
return f"pc_info('{self.id}', '{self.name}', '{self.user_name}', " \
f" '{self.user_password}')"
database.py
from sample.models import ServerLoginInfo
def get_login_info(server_name):
login_info_dict = {}
server_login_info = db.session.query(ServerLoginInfo).filter_by(name=server_name).first()
if server_login_info is None:
print(f'エラー。指定したサーバが見つかりません。サーバ名: {server_name}')
return
if server_login_info.user is not None:
login_info_dict['user'] = server_login_info.user_name
login_info_dict['password'] = server_login_info.user_password
return login_info_dict
if __name__ == '__main__':
login_info = get_login_info('madokamagica')
print(login_info)
以上