LoginSignup
0
1

More than 3 years have passed since last update.

pypyodbcを使って手っ取り早くAS/400からデータを取得してみた 仕込編1

Last updated at Posted at 2019-11-24

前回の記事、pypyodbcを使って手っ取り早くAS/400からデータを取得してみたでIBMi へのODBC接続ができるようになってので、いろいろやりたいことが出てきたのですが、考えてみたら仕込みが必要なので、今回その辺を作成してみました。

「その1」となっているのは、まだ道半ばで、入力部分の処理だけしか出来てないからです。
出力の方は実機がないと検証が難しいので、今回はパスしています。

実験環境

  • Windows7 (今回は実際の接続は行わない部分なので、自宅PCで検証)
  • Python 3.8
  • ipyhon 7.9.0 (いろいろ検証するために使用)

ヘルパーモジュール

後で使い倒したろうと考え、"sql_helper.py"というファイル(モジュール)にいろいろなものを詰め込んでいきます。パッケージを探せば見つかりそうな基本的な機能ですが、Pythonの復習をかねて車輪を再発明してみました。

今のところはこんな感じ。

sql_helper.py
def _load_lines(filename, encoding='utf-8'):
    return [ l for l in open(filename, 'r', -1, encoding) ]


def load_raw_string(filename, encoding='utf-8', rstrip=True, lstrip=False):
    def _echo(x): return x

    if   (    rstrip  and      lstrip): func = str.strip
    elif (not rstrip  and      lstrip): func = str.lstrip
    elif (    rstrip  and  not lstrip): func = str.rstrip    # default
    else:                               func = _echo         # dummy

    if  rstrip: term = "\n"  # cause rstrip(None) is remove "\n"!
    else:       term = ""

    s = _load_lines(filename, encoding)
    return term.join( list(map(lambda x: func(x), s)) )


def load_sql(filename, encoding='utf-8', lstrip=False):
    sts = []
    for st in strip_comment( load_raw_string(filename, encoding, True, lstrip) ).split(";\n"):
        if (st.isspace()  or  st == "" ): continue
        sts.append(st.strip("; \t\n") )
    return sts


def load_config(filename, encoding='utf-8'):
    cf = {}
    for line in strip_comment(load_raw_string(filename, encoding)).split("\n"):
        if ( line == ""  or  line.isspace() ): continue
        key_, val_ = line.split("=")
        cf[ key_.strip() ] = val_.strip()
    return cf

# ...もう少しだけ続きます...

簡単に説明します。

_load_lines()

指定されたファイルを読み込んで。単純に1行1要素のリストを作ってお終い。毎回open(xxx,...).readline()を書きたくないだけです。

load_raw_string()

上の _load_line() を使って取得したリストを読み込んで、行ごとにlstrip()、rstrip()を掛けた後、単独の文字列に再構成して返しています。
行頭の空白削除と、行末の空白削除の要否を選択できた方が良いかな?という安易な決断をした挙句、コードが大きくなりました。_load_lines()の前は、そのために追加することになった機能です。(リターン文を一行で終わらせたい為、とも言います)
func はパラメータによって、str.lstrip、str.rstrip、str.strip、_echo(入力値を返すだけ)を入れ替えています。
str.rstrip、str.strip を引数なしで呼ぶと行末にある改行も消えてしまうので、行末削除を指定したときは、term に改行記号を、そうでなければ、空白文字を入れて return文で再構成しています。

load_sql()

filename から、SQLを取得します。この中に";"(+改行)が含まれていれば、そこで分割してSQL文のlistにして戻せばお終い。

と考えたまではよかったのですが、これを実現するために、延々ヘルパーのヘルパーを作る羽目に...。
例えば、コメントの中の";\n"でブった切られるとまずいだろ?とか、引用符の中にある";\n"でぶった切られるとまずいでしょ? とかです。
複合ケースとして、...; /*ブロックコメント*/ となっていると、コメント除去した後に、もう一度行末の空白を除去しなきゃ";\n"にマッチしない! 等々...。

最終的にやっていることは、こんな感じです
1. 先ほどの load_row_string() を使って行末空白を除去
2. strip_comment() (後述)でコメントを除去、ついでにコメント除去した結果、行末になってしまった空白を除去
3. ここでやっと";\n" でSQL単独文に分割
4. 各文末の";"そのものを除去
5. 4.をリストに詰め直して返す

普通にSQLの簡易字句解析器を作った方が早かったかもしれません。

load_config()

以下の形式のファイルを読み込むと...

sample_connection.txt
/* supports sql style comments */
  driver = {iSeries Access ODBC Driver}
    system = 127.0.0.1  
    uid    = USER01      
    pwd    = USER01     

DefaultLibraries = MYLIB,TESTLIB1,TESTLIB2library

こんな感じの辞書に変換します。

{'DefaultLibraries': 'MYLIB,TESTLIB1,TESTLIB2',
 'driver': '{iSeries Access ODBC Driver}',
 'pwd': 'USER01',
 'system': '127.0.0.1',
 'uid': 'USER01'}

この辞書を pypyodbc.connect() 関数に渡してやれば、ずらずらとパラメータを書き連ねる必要もありません。

odbctest.py
from   pypyodbc import connect
import sql_helper as helper

config = helper.load_config( "sample_connection.txt" )

with connect( **config ) as connection:
    # データベースにアクセス処理...

strip_comment()

ソースが長くなるので、はしょっていたコメント除去機能になります。

sql_helper.py
from  enum  import Enum, auto
class States(Enum):
    Statement = auto()
    Quoted    = auto()
    Comment   = auto()
    End       = auto()

_LimitLen = 999999999

def _singlequote_begin(s, b):
    p = s.find("'", b)
    return p   if (p >= 0) else _LimitLen

def _singlequote_end(s, b):
    p = s.find("'", b)
    if (p >= 0  and  p == s.find("''", b)):  p = _singlequote_end(s, p + 2)    # find recursive
    return (_next, p, 0, States.Statement)   if (p >= 0) else (None. _LimitLen, 0, States.End)

def _doublequote_begin(s, b):
    p = s.find('"', b)
    return p   if (p >= 0) else _LimitLen

def _doublequote_end(s, b):
    p = s.find('"', b)
    return (_next, p, 0, States.Statement)  if (p >= 0) else (None, _LimitLen, 0, States.End)

def _block_comment_begin(s, b):
    p = s.find("/*", b)
    return p + 1  if (p >= 0) else _LimitLen

def _block_comment_end (s, b):
    p = s.find("*/", b)
    return (_next, p + len("*/") - 1, len("*/") -1, States.Statement)  if (p >= 0) else (None, _LimitLen, 0, States.End)

def _line_comment_begin(s, b):
    p = s.find("--", b)
    return p + 1  if (p >= 0) else _LimitLen

def _line_comment_end(s, b):
    p =  s.find("\n", b)
    return (_next, p + len("\n") - 1, len("\n") - 1, States.Statement)   if (p >= 0) else (None, _LimitLen, 0, States.End)


def _quote_begin(s, b):
    next_state = States.Quoted
    sq, dq = _singlequote_begin(s, b), _doublequote_begin(s, b)
    if  (min(sq, dq) == _LimitLen): next_state = States.End
    return (_singlequote_end, sq, 0, next_state)  if (sq < dq) else (_doublequote_end, dq, 0, next_state)

def _comment_begin(s, b):
    bc, lc = _block_comment_begin(s, b), _line_comment_begin(s, b)
    if  (min(bc, lc) == _LimitLen): next_ = States.End
    return (_line_comment_end, lc, 0, States.Comment)    if (lc < bc) else  (_block_comment_end, bc, 0, States.Comment)

def _next(s, b):
    q_func, q_pos, q_ad, q_st = _quote_begin(s, b)
    c_func, c_pos, c_ad, c_st = _comment_begin(s, b)
    return  (q_func, q_pos, 0, q_st)  if (q_pos < c_pos) else (c_func, c_pos, 0, c_st)


def strip_comment( st ):
    # 短縮評価
    if st == None  or  len( st.strip() ) == 0: return ""
    if ("/*" not in st)  and  ("--" not in st): return  "\n".join( list ( map(lambda x: x.rstrip(), st.split("\n"))))

    chars = list( st )

    comments = _find_comment_pos( st )
    comments.reverse()
    for c in comments:  del chars[ c[0]: c[1]]

    return "\n".join( list( map(lambda x: x.rstrip() , "".join( chars ).split("\n")) ) )


def _find_comment_pos( st ):
    cur   = -1
    begin =  0
    comments = []

    state = States.Statement
    pstate = None
    func = _next

    while (True):
        func, pos, adv, state = func(st, cur + 1)

        # 検索終了
        if  ( state == States.End):
            if  (pstate == States.Comment):
                comments.append((begin, len(st)))
            break

        cur = pos

        # コメント開始 -> 後続処理をスキップ
        if  (state == States.Quoted):
            pstate = state
            continue

        # end comment
        if  (pstate == States.Comment):
            comments.append((begin, pos + adv))
            pstate = state
            continue

        # begin comment
        if  (state == States.Comment):
            begin = pos - 1  # previous a length of single/double quotation

        pstate = state

    return comments

もう、自分で何を書いたか覚えてないので、間単に説明します。

概要

strip_comment() と、コメント位置を検索して(開始位置,終了位置)のリストを返す関数 _find_comment_pos() 関数が主要な関数で、その前にズラズラ並んだ関数、オブジェクトは。_find_comment_pos()が利用しているヘルパです。
雑多な関数群は、もともと _find_comment() の内部関数として定義していましたが、こうすると、
func, pos, adv, state = func(st, cur + 1) を実行したときに、先頭の func (次の検索すべき関数が返される)が、通常の関数オブジェクトではなく、3項目のタプルになっているようです。
この対処法を調べるのが大変そうなので、素直に通常の関数に変更しています。(そのせいで余計な関数をモジュール内に撒き散らすことに...)

_find_comment_pos()

ローカル変数 '''func``` は、次に検索に使用する関数を保持しています。現在のステータスと、ヒットした文字("/*"とか"'"とか改行とか) で場合分けしても実現できますが、条件分岐が酷いことになりそうなので、ヒットした文字に応じて適切な検索関数を返すようにしています。
例) "/
"が見つかったら、次に探すのは"*/"と決まるので、それを探す関数 _block_comment_end() が決まる

このとき一緒に、ヒットした位置を返せばコト足りるかな? と思いましたが、やっぱりこの情報がほしい、これも必要となって、各関数からは4つ値を返す羽目に...。
その後は、コメント位置をスライスで指定きるように数値を調整して、listに詰め込んでいます。
(もう詳細は忘れた..)

strip_comment()

コメント位置が定まったので、後はスライスを使ってコメントを除去するだけ!!

string クラスには、スライス位置の文字を除去するメソッドがない!!

仕方がないので、chars = list( st ) で文字のリストに直し、そのリストに対しdel chars[begin:end] することでやっと念願のアイスソーd、じゃなくてコメントの除去に成功しました。

最後のリターン文ですが、コメント除去の後の行末空白("本文; /* コメント */\n" パターン)を更に消し去る為に、また回りくどい処理を書くことになりました。

最後に

テストに使用したファイルと、テストコードです。(sample_connection.txt)は掲載済みのため省略)

sample_multi_statement.sql
select * from SYSLIBL 
  where TYPE not like = "';%"   ;/* comment */

select * from SYSTABLES  ;   /* "" ' '' ; ;
" */

  ;  -- empty statement

select * from SYSCOLUMNS        ;
select * from SYSIBM.SYSDUMMY1;

strip_commentの短絡評価確認用

sample_multi_statement.sql
select * from SYSLIBL 
  where TYPE not like = "';%"   ;

select * from SYSTABLES  ;   

;

select * from SYSCOLUMNS        ;   
select * from SYSIBM.SYSDUMMY1;
sql_helper.py
def test_loading():
    from pprint import pprint

    file = "sample_multi_statement.sql"
    print( f'\ntest load_sql("{file}") ----------------------------')
    sts = load_sql( file )
    pprint( sts )

    file = "sample_multi_statement2.sql"
    print( f'\ntest load_sql("{file}") ----------------------------')
    sts = load_sql( file )
    pprint( sts )

    file = "sample_config.txt"
    print( f'\ntest load_config("{file}") ----------------------------')
    config = load_config( file )
    pprint( config )

読み込み処理だけで予想以上のコード量になりました。
IBM i での使用を前提に作成を始めましたが、ここまでの内容でしたら、他のDBMSの前処理として流用できると思います。

次回があれば、出力側に取り掛かります。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1