1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

EDINET API v2をFastAPIでラップしてAirbyteで読み込む

Last updated at Posted at 2024-07-21

EDINETから有価証券報告書読み込むのいいですよね。ありがたい限り。でもAPIの仕様がいろいろ微妙でAPIコネクタでそのまま読み込むのはなんかつらいです。v1のときよりはだいぶマシですが。

本記事ではFastAPIでEDINETのAPIをラップしてしまって自分好みのAPI仕様に変えることで、Airbyteのカスタムコネクタでスムーズに読み込ませるってのを試しています。

FastAPIの説明とかは不要だと思うのでわからなかったらチュートリアルが最強に親切なのでチュートリアル見てください。

EDINETについてはこの辺の記事を参考にさせてもらっています。
EDINETの書類情報をMySQLに書き込む
EDINETのデータを利用して、BS/PLを可視化する

EDINET APIの仕様などはこちらの記事などをみてくださいな。

なお、雑なコードはこちらにおいてあります。
https://github.com/myu65/edinet_api_rapper

FastAPIでラップする

細かいコードの説明はもうしませんが、自分がお気に召す感じに加工しています。
超絶雑実装なのであれですが、最終系では、

from_dateto_dateで範囲を指定しつつ1日分だけ返してnext_page_tokenでデータのある次の日を返す感じの仕様です。この仕様だとAirbyteでいい感じにデータを読み込めます。そうじゃないとなんか全日付とってきてみたりとってこなかったりややこしかった。

かつ、docIDでとれるデータの中身をダウンロードしてきて、zipを解凍してjsonに直してdataとして元のテーブル形式のメタデータに足して入れています。別々に取ろうとしたけど、こっちの方が圧倒的にハンドリングが楽。


@app.get("/edinet_yukashoken")
def get_edinet_json(
    from_date :date,
    to_date: date,
    next_page_token: date = None,
    api_key: str = Security(get_api_key),
    ):
    """EDINETを日付範囲で有価証券報告書を1テーブルで返す。範囲を決めたうえで半ページ送り式でnext_tokenには日付が入る。返ってくるnext_tokenがnoneならおわり。"""

    # 参考
    # https://note.com/python_beginner/n/na0e51d80bc35

    edinet_api_key = os.getenv("EDINET_API_KEY")
    URL = "https://disclosure.edinet-fsa.go.jp/api/v2/documents.json"

    # metadataをとる

    if next_page_token:
        # 最初はNoneが来ることになってるのでそうじゃなかったら続きから
        target_date = next_page_token
    else:
        target_date = from_date

    # メタデータ部分とる。適当。
    while target_date <= to_date:

        params = {
            "date" : target_date,
            "type" : 2,
            "Subscription-Key" : edinet_api_key
        }

        res = requests.get(URL, params = params)
        data = res.json()


        # 有価証券報告書だけとりだす
        data = [doc for doc in data['results'] if doc['docDescription'] is not None and '有価証券報告書' in doc['docDescription']]


        # 次の日付に送っておく
        target_date += timedelta(days=1)
        next_page_token = target_date

        if not data:
            # 空だったらループを継続。
            continue

        # データのある1日単位ごとに返す。
        break

    # doc部分とる
    for doc in data:
        doc_data = edinet_doc.get_edinet_doc_json(doc["docID"])
        doc["data"] = doc_data


    if next_page_token >= to_date:
        next_page_token = None

    return {'data': data, 'next_page_token':next_page_token}

ちなみにZipを解凍して返すところで圧倒的にはまったポイントは、csvの文字コードがutf16-leだったことでした。複数ファイルをまとめて読み込むのにduckdbが楽なのでduckdb使ったのに文字コードで読み込めなくて、utf16をutf8に変換するコードかませたせいで、順番にpandasで読み込むのと実装の手間が同じになってしまった。おしまい!


def get_edinet_doc_json(
    doc_id :str,
    
    ):
    """EDINETを叩いてまずはdocidの中身を解凍して返す
    日付を何かテーブルに入れたいけどめんどくさい。
    """

    # 参考
    # https://note.com/python_beginner/n/na0e51d80bc35
    # https://qiita.com/kj1729/items/88b2ffc3e21b98c91aea

    edinet_api_key = os.getenv("EDINET_API_KEY")
    URL = f"https://disclosure.edinet-fsa.go.jp/api/v2/documents/{doc_id}"

    params = {
        "type" : 5,
        "Subscription-Key" : edinet_api_key
    }

    dir_id = doc_id + str(uuid.uuid4())
    
    # https://qiita.com/nujust/items/9cb4564e712720549bc1
    extract_dir = f"downloads/{dir_id}"

    try:
        with (
            requests.get(URL, params = params) as res,
            io.BytesIO(res.content) as bytes_io,
            zipfile.ZipFile(bytes_io) as zip,
        ):
            zip.extractall(extract_dir)

    except:
        # たぶんステータス404が返ってきてる
        data = res.json()
        print(data)
        return None


    path_list = glob(f"downloads/{dir_id}/XBRL_TO_CSV/*")
    utf_converter.utf16_to_utf8(path_list)

    duckdb.read_csv(f"downloads/{dir_id}/XBRL_TO_CSV/*.csv")    

    data = duckdb.sql(f"""select * from 'downloads/{dir_id}/XBRL_TO_CSV/*.csv'""").to_df().to_dict('records')
    shutil.rmtree(f"downloads/{dir_id}")

    return data

で、こんな感じで/edinet_yukashokenエンドポイントを作ったら、Airbyteのカスタムコネクタで読み込みます。

Airbyte

Airbyte自体の説明は公式ページや
Airbyte公式
いろんな記事が参考になります
自前でデータパイプラインをサクッと構築できる「Airbyte」を試してみた
EC2 にインストールした Airbyte でデータ連携を試してみた
(https://zenn.dev/gak_t12/articles/aa7d450d436db9)
GKE上にAirbyteを構築しSaaSデータ連携をリプレイスした話
Snowflake と Airbyte ではじめるモダンデータスタックへの道
Helmを使用せずにKustomizeでAirbyteをKubernetes上に構築する

でも、コネクタビルダーでやってる記事みかけないよな…。

Airbyte No Code Connector Builder

image.png

こんなコネクタービルダーがついててAPI系なら割と簡単にデータ取り込みができます。
公式チュートリアル

情報が公式チュートリアルしか碌なのがない…。。。

やることは、APIのURL指定して、APIキーなどの情報をいれて(実際にいれるのはソースとして実体化するタイミング。設定を書くだけ)

image.png

エンドポイント指定して、主キー指定して、レコードセレクターでどこに「テーブル」となるデータが入ってるか指定して
image.png

必要ならクエリパラメーターとか色々かいて、ページ送り設定して(ここが今回の肝)
image.png

日付で更新分をとるように指定します。
image.png

日付のところ、Range以外にもあるんだけど、その日付以降全部取るみたいな前提になっててハンドリングが悪かったのでRangeをFastAPIで作ったわけです。で、next_page_tokenで実際にとる日付を指定してやることで適度なバランス(一発16MBは全く適度でない)でデータを送り込めるようにしています。

元の仕様みたいに、メタデータなテーブルを親としてdocIDをとるのもAirbyteで実はできるんだけど、このあたりの増分更新が思った感じならなかったのでこうした。なんで全日付みるねん…。

ちなみに最終系ではとった日付の最後の日だけはダブってみよります。時間まで指定していない場合は最後の日は更新あるかもしれないのでデータとったうえで内部でチェックする仕様なんだとか。

あとはポチポチっとソースとコネクション設定してやると

image.png

image.png

image.png

image.png

自動でデータが更新できるわけです。
image.png

いやー簡単簡単。ありがたい世の中。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?