前回の記事、pypyodbcを使って手っ取り早くAS/400からデータを取得してみたでIBMi へのODBC接続ができるようになってので、いろいろやりたいことが出てきたのですが、考えてみたら仕込みが必要なので、今回その辺を作成してみました。
「その1」となっているのは、まだ道半ばで、入力部分の処理だけしか出来てないからです。
出力の方は実機がないと検証が難しいので、今回はパスしています。
実験環境
- Windows7 (今回は実際の接続は行わない部分なので、自宅PCで検証)
- Python 3.8
- ipyhon 7.9.0 (いろいろ検証するために使用)
ヘルパーモジュール
後で使い倒したろうと考え、"sql_helper.py"というファイル(モジュール)にいろいろなものを詰め込んでいきます。パッケージを探せば見つかりそうな基本的な機能ですが、Pythonの復習をかねて車輪を再発明してみました。
今のところはこんな感じ。
def _load_lines(filename, encoding='utf-8'):
return [ l for l in open(filename, 'r', -1, encoding) ]
def load_raw_string(filename, encoding='utf-8', rstrip=True, lstrip=False):
def _echo(x): return x
if ( rstrip and lstrip): func = str.strip
elif (not rstrip and lstrip): func = str.lstrip
elif ( rstrip and not lstrip): func = str.rstrip # default
else: func = _echo # dummy
if rstrip: term = "\n" # cause rstrip(None) is remove "\n"!
else: term = ""
s = _load_lines(filename, encoding)
return term.join( list(map(lambda x: func(x), s)) )
def load_sql(filename, encoding='utf-8', lstrip=False):
sts = []
for st in strip_comment( load_raw_string(filename, encoding, True, lstrip) ).split(";\n"):
if (st.isspace() or st == "" ): continue
sts.append(st.strip("; \t\n") )
return sts
def load_config(filename, encoding='utf-8'):
cf = {}
for line in strip_comment(load_raw_string(filename, encoding)).split("\n"):
if ( line == "" or line.isspace() ): continue
key_, val_ = line.split("=")
cf[ key_.strip() ] = val_.strip()
return cf
# ...もう少しだけ続きます...
簡単に説明します。
_load_lines()
指定されたファイルを読み込んで。単純に1行1要素のリストを作ってお終い。毎回open(xxx,...).readline()
を書きたくないだけです。
load_raw_string()
上の _load_line()
を使って取得したリストを読み込んで、行ごとにlstrip()、rstrip()を掛けた後、単独の文字列に再構成して返しています。
行頭の空白削除と、行末の空白削除の要否を選択できた方が良いかな?という安易な決断をした挙句、コードが大きくなりました。_load_lines()の前は、そのために追加することになった機能です。(リターン文を一行で終わらせたい為、とも言います)
func
はパラメータによって、str.lstrip、str.rstrip、str.strip、_echo(入力値を返すだけ)を入れ替えています。
str.rstrip、str.strip を引数なしで呼ぶと行末にある改行も消えてしまうので、行末削除を指定したときは、term
に改行記号を、そうでなければ、空白文字を入れて return文で再構成しています。
load_sql()
filename から、SQLを取得します。この中に";"(+改行)が含まれていれば、そこで分割してSQL文のlistにして戻せばお終い。
と考えたまではよかったのですが、これを実現するために、延々ヘルパーのヘルパーを作る羽目に...。
例えば、コメントの中の";\n"でブった切られるとまずいだろ?とか、引用符の中にある";\n"でぶった切られるとまずいでしょ? とかです。
複合ケースとして、...; /*ブロックコメント*/
となっていると、コメント除去した後に、もう一度行末の空白を除去しなきゃ";\n"にマッチしない! 等々...。
最終的にやっていることは、こんな感じです
1. 先ほどの load_row_string() を使って行末空白を除去
2. strip_comment() (後述)でコメントを除去、ついでにコメント除去した結果、行末になってしまった空白を除去
3. ここでやっと";\n" でSQL単独文に分割
4. 各文末の";"そのものを除去
5. 4.をリストに詰め直して返す
普通にSQLの簡易字句解析器を作った方が早かったかもしれません。
load_config()
以下の形式のファイルを読み込むと...
/* supports sql style comments */
driver = {iSeries Access ODBC Driver}
system = 127.0.0.1
uid = USER01
pwd = USER01
DefaultLibraries = MYLIB,TESTLIB1,TESTLIB2library
こんな感じの辞書に変換します。
{'DefaultLibraries': 'MYLIB,TESTLIB1,TESTLIB2',
'driver': '{iSeries Access ODBC Driver}',
'pwd': 'USER01',
'system': '127.0.0.1',
'uid': 'USER01'}
この辞書を pypyodbc.connect() 関数に渡してやれば、ずらずらとパラメータを書き連ねる必要もありません。
from pypyodbc import connect
import sql_helper as helper
config = helper.load_config( "sample_connection.txt" )
with connect( **config ) as connection:
# データベースにアクセス処理...
strip_comment()
ソースが長くなるので、はしょっていたコメント除去機能になります。
from enum import Enum, auto
class States(Enum):
Statement = auto()
Quoted = auto()
Comment = auto()
End = auto()
_LimitLen = 999999999
def _singlequote_begin(s, b):
p = s.find("'", b)
return p if (p >= 0) else _LimitLen
def _singlequote_end(s, b):
p = s.find("'", b)
if (p >= 0 and p == s.find("''", b)): p = _singlequote_end(s, p + 2) # find recursive
return (_next, p, 0, States.Statement) if (p >= 0) else (None. _LimitLen, 0, States.End)
def _doublequote_begin(s, b):
p = s.find('"', b)
return p if (p >= 0) else _LimitLen
def _doublequote_end(s, b):
p = s.find('"', b)
return (_next, p, 0, States.Statement) if (p >= 0) else (None, _LimitLen, 0, States.End)
def _block_comment_begin(s, b):
p = s.find("/*", b)
return p + 1 if (p >= 0) else _LimitLen
def _block_comment_end (s, b):
p = s.find("*/", b)
return (_next, p + len("*/") - 1, len("*/") -1, States.Statement) if (p >= 0) else (None, _LimitLen, 0, States.End)
def _line_comment_begin(s, b):
p = s.find("--", b)
return p + 1 if (p >= 0) else _LimitLen
def _line_comment_end(s, b):
p = s.find("\n", b)
return (_next, p + len("\n") - 1, len("\n") - 1, States.Statement) if (p >= 0) else (None, _LimitLen, 0, States.End)
def _quote_begin(s, b):
next_state = States.Quoted
sq, dq = _singlequote_begin(s, b), _doublequote_begin(s, b)
if (min(sq, dq) == _LimitLen): next_state = States.End
return (_singlequote_end, sq, 0, next_state) if (sq < dq) else (_doublequote_end, dq, 0, next_state)
def _comment_begin(s, b):
bc, lc = _block_comment_begin(s, b), _line_comment_begin(s, b)
if (min(bc, lc) == _LimitLen): next_ = States.End
return (_line_comment_end, lc, 0, States.Comment) if (lc < bc) else (_block_comment_end, bc, 0, States.Comment)
def _next(s, b):
q_func, q_pos, q_ad, q_st = _quote_begin(s, b)
c_func, c_pos, c_ad, c_st = _comment_begin(s, b)
return (q_func, q_pos, 0, q_st) if (q_pos < c_pos) else (c_func, c_pos, 0, c_st)
def strip_comment( st ):
# 短縮評価
if st == None or len( st.strip() ) == 0: return ""
if ("/*" not in st) and ("--" not in st): return "\n".join( list ( map(lambda x: x.rstrip(), st.split("\n"))))
chars = list( st )
comments = _find_comment_pos( st )
comments.reverse()
for c in comments: del chars[ c[0]: c[1]]
return "\n".join( list( map(lambda x: x.rstrip() , "".join( chars ).split("\n")) ) )
def _find_comment_pos( st ):
cur = -1
begin = 0
comments = []
state = States.Statement
pstate = None
func = _next
while (True):
func, pos, adv, state = func(st, cur + 1)
# 検索終了
if ( state == States.End):
if (pstate == States.Comment):
comments.append((begin, len(st)))
break
cur = pos
# コメント開始 -> 後続処理をスキップ
if (state == States.Quoted):
pstate = state
continue
# end comment
if (pstate == States.Comment):
comments.append((begin, pos + adv))
pstate = state
continue
# begin comment
if (state == States.Comment):
begin = pos - 1 # previous a length of single/double quotation
pstate = state
return comments
もう、自分で何を書いたか覚えてないので、間単に説明します。
概要
strip_comment() と、コメント位置を検索して(開始位置,終了位置)のリストを返す関数 _find_comment_pos() 関数が主要な関数で、その前にズラズラ並んだ関数、オブジェクトは。_find_comment_pos()が利用しているヘルパです。
雑多な関数群は、もともと _find_comment() の内部関数として定義していましたが、こうすると、
func, pos, adv, state = func(st, cur + 1)
を実行したときに、先頭の func (次の検索すべき関数が返される)が、通常の関数オブジェクトではなく、3項目のタプルになっているようです。
この対処法を調べるのが大変そうなので、素直に通常の関数に変更しています。(そのせいで余計な関数をモジュール内に撒き散らすことに...)
_find_comment_pos()
ローカル変数 '''func``` は、次に検索に使用する関数を保持しています。現在のステータスと、ヒットした文字("/*"とか"'"とか改行とか) で場合分けしても実現できますが、条件分岐が酷いことになりそうなので、ヒットした文字に応じて適切な検索関数を返すようにしています。
例) "/"が見つかったら、次に探すのは"*/"と決まるので、それを探す関数 _block_comment_end() が決まる
このとき一緒に、ヒットした位置を返せばコト足りるかな? と思いましたが、やっぱりこの情報がほしい、これも必要となって、各関数からは4つ値を返す羽目に...。
その後は、コメント位置をスライスで指定きるように数値を調整して、listに詰め込んでいます。
(もう詳細は忘れた..)
strip_comment()
コメント位置が定まったので、後はスライスを使ってコメントを除去するだけ!!
string クラスには、スライス位置の文字を除去するメソッドがない!!
仕方がないので、chars = list( st )
で文字のリストに直し、そのリストに対しdel chars[begin:end]
することでやっと念願のアイスソーd、じゃなくてコメントの除去に成功しました。
最後のリターン文ですが、コメント除去の後の行末空白("本文; /* コメント */\n" パターン)を更に消し去る為に、また回りくどい処理を書くことになりました。
最後に
テストに使用したファイルと、テストコードです。(sample_connection.txt)は掲載済みのため省略)
select * from SYSLIBL
where TYPE not like = "';%" ;/* comment */
select * from SYSTABLES ; /* "" ' '' ; ;
" */
; -- empty statement
select * from SYSCOLUMNS ;
select * from SYSIBM.SYSDUMMY1;
strip_commentの短絡評価確認用
select * from SYSLIBL
where TYPE not like = "';%" ;
select * from SYSTABLES ;
;
select * from SYSCOLUMNS ;
select * from SYSIBM.SYSDUMMY1;
def test_loading():
from pprint import pprint
file = "sample_multi_statement.sql"
print( f'\ntest load_sql("{file}") ----------------------------')
sts = load_sql( file )
pprint( sts )
file = "sample_multi_statement2.sql"
print( f'\ntest load_sql("{file}") ----------------------------')
sts = load_sql( file )
pprint( sts )
file = "sample_config.txt"
print( f'\ntest load_config("{file}") ----------------------------')
config = load_config( file )
pprint( config )
読み込み処理だけで予想以上のコード量になりました。
IBM i での使用を前提に作成を始めましたが、ここまでの内容でしたら、他のDBMSの前処理として流用できると思います。
次回があれば、出力側に取り掛かります。