0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

企業の暗号資産会計:オンチェーン×取引所の多ソース突合をSQLiteで最小実装

Posted at

926f65a3-fb42-43b9-97b9-422b6ad42656.png

TL;DR

txid一致→時間窓×金額×アドレス一致の優先度で突合し、**二重仕訳+監査ログ(ハッシュ鎖)**まで自動生成します。

依存は標準ライブラリ+SQLiteのみ。1ファイル・数十行で動くため、会計×データ基盤の**“再現可能性”**を担保できます。

未突合はサスペンス勘定に自動送客し、オンチェーン照合率をKPIとして可視化します(会計・監査・CSの共通言語)。

背景と狙い

企業で暗号資産(例:ETH/USDC)を保有・送受金する際、オンチェーン(ウォレット)と取引所(交換業者)で二重管理が発生します。
**どちらが正?という議論ではなく、「同じ事象を互いに説明できるか」**が監査観点では重要です。この記事では、最小構成で多ソース突合→仕訳→監査ログまでを一気通貫で再現します。

全体像(1枚図)
flowchart LR
A[オンチェーンTx] -->|staging| S[正規化]
B[取引所レジャー] -->|staging| S
C[価格フィード(JPY)] -->|日次結合| V[評価]
S --> R[突合エンジン
①txid ②時間窓+金額+アドレス]
R --> J[仕訳生成
101-1/101-2/費用/サスペンス]
J --> H[監査ログ
改ざん耐性(前ハッシュ鎖)]
R --> U[未突合リスト
差異是正/エスカレーション]
V --> J

仕様(最小)

データソース

onchain_tx(txid, ts, from_addr, to_addr, asset, amount)

exchange_ledger(id, ts, side[deposit|withdraw], asset, amount, txid?, from_addr?, to_addr?, fee_asset?, fee_amount?)

price_feed(d, asset, jpy) … 評価用のシンプルな日足

address_map(raw_addr, label, kind[company|exchange|counterparty])

突合ロジック(優先度)

強一致:exchange_ledger.txid = onchain_tx.txid

時間窓一致:金額一致±ε、方向(入出庫)×アドレス整合、±10分以内

会計ロジック(例)

自己保管⇄取引所:101-1 自己保管 と 101-2 取引所保管 の振替

交換業者手数料:601-1 手数料 / 101-2

外部送金・受領(未分類):199-9 サスペンス(出) / 499-9 サスペンス(入)

監査ログ:仕訳行を時系列に連鎖ハッシュ(sha256(行内容||prev_hash))

実装(コピペで実行)

依存:Python3(標準ライブラリのみ)
実行:python3 reconcile_demo.py

reconcile_demo.py

import sqlite3, hashlib

conn = sqlite3.connect(":memory:")
cur = conn.cursor()
cur.executescript("""
PRAGMA foreign_keys=ON;

-- 1) スキーマ
CREATE TABLE onchain_tx (
txid TEXT PRIMARY KEY, ts TEXT, chain TEXT,
from_addr TEXT, to_addr TEXT, asset TEXT, amount REAL
);
CREATE TABLE exchange_ledger (
id TEXT PRIMARY KEY, ts TEXT, exch TEXT, side TEXT,
asset TEXT, amount REAL, txid TEXT, from_addr TEXT, to_addr TEXT,
fee_asset TEXT, fee_amount REAL
);
CREATE TABLE price_feed (d TEXT, asset TEXT, jpy REAL, PRIMARY KEY(d, asset));
CREATE TABLE address_map (raw_addr TEXT PRIMARY KEY, label TEXT, kind TEXT);

-- 2) サンプルデータ(最小)
""")

onchain = [
("0xaaa","2025-09-01 10:00:00","ethereum","0xSelf1","0xExDeposit1","USDC",1000.0),
("0xbbb","2025-09-01 11:01:00","ethereum","0xExHot1","0xSelf1","USDC",500.0),
("0xccc","2025-09-01 12:00:00","ethereum","0xSelf1","0xVendorA","ETH",1.2),
("0xddd","2025-09-01 13:05:00","ethereum","0xCustomerA","0xSelf2","USDC",2000.0),
]
ex_ledger = [
("E1","2025-09-01 10:05:00","Binance","deposit","USDC",1000.0,"0xaaa","0xSelf1","0xExDeposit1",None,0.0),
("E2","2025-09-01 11:00:00","Binance","withdraw","USDC",500.0,"0xbbb",None,"0xSelf1","USDC",1.0),
]
price_feed = [("2025-09-01","USDC",150.0), ("2025-09-01","ETH",450000.0)]
addr_map = [
("0xSelf1","Treasury Wallet 1","company"),
("0xSelf2","Treasury Wallet 2","company"),
("0xExDeposit1","Exchange Deposit","exchange"),
("0xExHot1","Exchange Hot","exchange"),
("0xVendorA","Vendor A","counterparty"),
("0xCustomerA","Customer A","counterparty"),
]

cur.executemany("INSERT INTO onchain_tx VALUES (?,?,?,?,?,?,?)", onchain)
cur.executemany("INSERT INTO exchange_ledger VALUES (?,?,?,?,?,?,?,?,?,?,?)", ex_ledger)
cur.executemany("INSERT INTO price_feed VALUES (?,?,?)", price_feed)
cur.executemany("INSERT INTO address_map VALUES (?,?,?)", addr_map)

3) 突合ビュー(txid一致→時間窓一致)

cur.executescript("""
DROP VIEW IF EXISTS match_by_txid;
CREATE VIEW match_by_txid AS
SELECT e.id AS ex_id, e.side, e.asset, e.amount AS ex_amt, e.ts AS e_ts,
o.ts AS o_ts, o.txid, o.from_addr, o.to_addr, o.amount AS on_amt
FROM exchange_ledger e
JOIN onchain_tx o ON e.txid IS NOT NULL AND e.txid = o.txid;

DROP VIEW IF EXISTS candidates_by_window;
CREATE VIEW candidates_by_window AS
SELECT e.id AS ex_id, e.side, e.asset, e.amount AS ex_amt, e.ts AS e_ts,
o.ts AS o_ts, o.txid, o.from_addr, o.to_addr, o.amount AS on_amt,
ABS(strftime('%s', e.ts) - strftime('%s', o.ts)) AS dt_sec,
CASE WHEN e.side='deposit' AND lower(o.to_addr)=lower(e.to_addr) THEN 1
WHEN e.side='withdraw' AND lower(o.from_addr)=lower(e.to_addr) THEN 1
ELSE 0 END AS addr_match
FROM exchange_ledger e
JOIN onchain_tx o
ON e.txid IS NULL AND e.asset = o.asset
AND ABS(o.amount - e.amount) <= 0.000001;

DROP VIEW IF EXISTS match_by_window;
CREATE VIEW match_by_window AS
SELECT * FROM candidates_by_window WHERE addr_match=1 AND dt_sec <= 600;

DROP TABLE IF EXISTS matches;
CREATE TABLE matches AS
SELECT ex_id, side, asset, ex_amt, e_ts, o_ts, txid, from_addr, to_addr, on_amt, 'txid' AS method
FROM match_by_txid
UNION ALL
SELECT ex_id, side, asset, ex_amt, e_ts, o_ts, txid, from_addr, to_addr, on_amt, 'window' AS method
FROM match_by_window;

DROP VIEW IF EXISTS unmatched_exchange;
CREATE VIEW unmatched_exchange AS
SELECT e.* FROM exchange_ledger e LEFT JOIN matches m ON m.ex_id = e.id WHERE m.ex_id IS NULL;

DROP VIEW IF EXISTS company_wallets;
CREATE VIEW company_wallets AS SELECT raw_addr FROM address_map WHERE kind='company';

DROP VIEW IF EXISTS unmatched_onchain;
CREATE VIEW unmatched_onchain AS
SELECT o.* FROM onchain_tx o LEFT JOIN matches m ON m.txid = o.txid
WHERE m.txid IS NULL AND (
lower(o.from_addr) IN (SELECT lower(raw_addr) FROM company_wallets) OR
lower(o.to_addr) IN (SELECT lower(raw_addr) FROM company_wallets)
);

-- 4) 評価ビュー
DROP VIEW IF EXISTS v_matches_valued;
CREATE VIEW v_matches_valued AS
SELECT m.*, DATE(m.o_ts) AS d, p.jpy AS px_jpy, m.on_amt * p.jpy AS jpy_amount
FROM matches m LEFT JOIN price_feed p ON p.d = DATE(m.o_ts) AND p.asset = m.asset;

-- 5) 仕訳テーブル
DROP TABLE IF EXISTS journal;
CREATE TABLE journal (
j_id TEXT PRIMARY KEY, ts TEXT, debit_acct TEXT, credit_acct TEXT,
asset TEXT, qty REAL, jpy_amount REAL, memo TEXT, link_txid TEXT, link_ex_id TEXT
);
""")

def acct(code, name):
return f"{code} {name}"

5-1) 突合済(自己⇄取引所)の仕訳 + 取引所手数料

for (ex_id, side, asset, ex_amt, e_ts, o_ts, txid, from_addr, to_addr, on_amt, method, d, px, jpy) in
cur.execute("SELECT * FROM v_matches_valued"):
if side == 'deposit':
debit, credit = acct("101-2","Digital Asset - Exchange"), acct("101-1","Digital Asset - Self-custody")
else: # withdraw
debit, credit = acct("101-1","Digital Asset - Self-custody"), acct("101-2","Digital Asset - Exchange")
cur.execute("INSERT INTO journal VALUES (?,?,?,?,?,?,?,?,?,?)",
(f"J-{ex_id}", o_ts, debit, credit, asset, on_amt, jpy,
f"{asset} {side} matched by {method} (ex_id={ex_id}, txid={txid})", txid, ex_id))
# fee(あれば)
fee_asset, fee_amt = cur.execute("SELECT fee_asset, fee_amount FROM exchange_ledger WHERE id=?", (ex_id,)).fetchone()
if fee_amt:
fee_px = cur.execute("SELECT jpy FROM price_feed WHERE d=? AND asset=?", (d, fee_asset)).fetchone()[0]
cur.execute("INSERT INTO journal VALUES (?,?,?,?,?,?,?,?,?,?)",
(f"J-{ex_id}-FEE", e_ts,
acct("601-1","Transaction Fee Expense"), acct("101-2","Digital Asset - Exchange"),
fee_asset, fee_amt, fee_amt*fee_px, f"{fee_asset} fee on exchange {ex_id}", txid, ex_id))

5-2) 未突合オンチェーン(外部⇄自社)をサスペンスへ

def is_company(addr):
return cur.execute("SELECT 1 FROM address_map WHERE lower(raw_addr)=lower(?) AND kind='company'", (addr,)).fetchone() is not None

for (txid, ts, chain, from_addr, to_addr, asset, amount) in cur.execute(
"SELECT txid, ts, chain, from_addr, to_addr, asset, amount FROM unmatched_onchain ORDER BY ts"):
d = ts.split(" ")[0]
px = cur.execute("SELECT COALESCE(jpy,0) FROM price_feed WHERE d=? AND asset=?", (d, asset)).fetchone()[0]
jpy = amount * px
if is_company(from_addr) and not is_company(to_addr): # 外部へ出金
debit, credit = acct("199-9","Suspense (Unclassified Outflow)"), acct("101-1","Digital Asset - Self-custody")
memo = f"{asset} onchain outflow to external (txid={txid})"
elif is_company(to_addr) and not is_company(from_addr): # 外部から入金
debit, credit = acct("101-1","Digital Asset - Self-custody"), acct("499-9","Suspense (Unclassified Inflow)")
memo = f"{asset} onchain inflow from external (txid={txid})"
else: # 内部/不明
debit, credit = acct("199-9","Suspense (Unclassified Outflow)"), acct("499-9","Suspense (Unclassified Inflow)")
memo = f"{asset} ambiguous movement (txid={txid})"
cur.execute("INSERT INTO journal VALUES (?,?,?,?,?,?,?,?,?,?)",
(f"J-{txid}", ts, debit, credit, asset, amount, jpy, memo, txid, None))

6) 監査ログ(ハッシュ鎖)

cur.executescript("DROP TABLE IF EXISTS audit_log; CREATE TABLE audit_log (seq INTEGER PRIMARY KEY AUTOINCREMENT, j_id TEXT, ts TEXT, hash TEXT, prev_hash TEXT);")
prev = ""
for row in cur.execute("SELECT j_id, ts, debit_acct, credit_acct, asset, qty, jpy_amount, IFNULL(link_txid,''), IFNULL(link_ex_id,'') FROM journal ORDER BY ts, j_id"):
s = "|".join(map(str,row)) + "|" + prev
h = hashlib.sha256(s.encode()).hexdigest()
cur.execute("INSERT INTO audit_log(j_id,ts,hash,prev_hash) VALUES (?,?,?,?)", (row[0],row[1],h,prev))
prev = h

7) KPI(突合率)

m_on_cnt, m_on_jpy = cur.execute("""
SELECT COUNT(), COALESCE(SUM(on_amt * p.jpy),0)
FROM matches m LEFT JOIN price_feed p ON p.d=DATE(m.o_ts) AND p.asset=m.asset
""").fetchone()
t_on_cnt, t_on_jpy = cur.execute("""
SELECT COUNT(
), COALESCE(SUM(o.amount * p.jpy),0)
FROM onchain_tx o LEFT JOIN price_feed p ON p.d=DATE(o.ts) AND p.asset=o.asset
""").fetchone()
print(f"[coverage] onchain: count {m_on_cnt}/{t_on_cnt}, JPY {int(m_on_jpy)}/{int(t_on_jpy)}")

8) ダンプ(学習用)

print("\n[journal]")
for r in cur.execute("SELECT ts, debit_acct, credit_acct, asset, qty, jpy_amount, memo FROM journal ORDER BY ts"): print(r)
print("\n[audit head]")
for r in cur.execute("SELECT seq, j_id, substr(hash,1,16) FROM audit_log LIMIT 5"): print(r)

9) 最低限の検証

assert m_on_cnt == 2, "突合件数が想定と異なります"
print("\nOK")

実行結果(例)

[coverage] onchain: count 2/4, JPY 225000/1065000

[journal]
('2025-09-01 10:00:00', '101-2 Digital Asset - Exchange', '101-1 Digital Asset - Self-custody', 'USDC', 1000.0, 150000.0, 'USDC deposit to exchange matched by txid (ex_id=E1, txid=0xaaa)')
('2025-09-01 11:00:00', '601-1 Transaction Fee Expense', '101-2 Digital Asset - Exchange', 'USDC', 1.0, 150.0, 'USDC fee on exchange E2')
('2025-09-01 11:01:00', '101-1 Digital Asset - Self-custody', '101-2 Digital Asset - Exchange', 'USDC', 500.0, 75000.0, 'USDC withdraw from exchange matched by txid (ex_id=E2, txid=0xbbb)')
('2025-09-01 12:00:00', '199-9 Suspense (Unclassified Outflow)', '101-1 Digital Asset - Self-custody', 'ETH', 1.2, 540000.0, 'ETH onchain outflow to external (txid=0xccc)')
('2025-09-01 13:05:00', '101-1 Digital Asset - Self-custody', '499-9 Suspense (Unclassified Inflow)', 'USDC', 2000.0, 300000.0, 'USDC onchain inflow from external (txid=0xddd)')

[audit head]
(1, 'J-E1', '488a59ec53567cbd')
(2, 'J-E2-FEE', '517085b95452e87f')
(3, 'J-E2', 'f53d42bd2851333a')
(4, 'J-0xccc', '96a44e152c6ec913')
(5, 'J-0xddd', '8f04df4040c3b927')

OK

設計ポイント(なぜこの形にしたか)

再現性:SQLite・標準ライブラリで完結。監査説明で「このコードで同じ結果が出る」を担保。

優先度付き突合:まずtxid、次に時間窓×金額×方向×アドレス。誤検出率と捕捉率のバランスを運用で調整可能。

監査ログ:連鎖ハッシュは「改ざんがあれば必ず痕跡が残る」を実演する最低限の方法。

運用Tips(スケール/監査/コスト)

スケール:本番はBigQuery/DuckDBを推奨。日付パーティション×assetクラスタでスキャン費用を抑制。

一意性:event_uid = source + primary_key を論理IDに、冪等(idempotent)なアップサートを徹底。

ルール外だが重要な実務

グロス/ネット表記の統一(手数料控除の扱い)。

評価レートの出所・時点(EOD/加重平均など)を会計方針で固定。

内部移転自動識別(address_map.kind='company'が両端にある場合は“社内振替”)。

KPI例:オンチェーン照合率(件数/金額)、サスペンス滞留日数、監査指摘件数。

よくある質問(短問短答)

Q. 取引所側のtxidが欠ける場合は?
A. 本稿の時間窓一致が効きます。窓幅と金額誤差εは運用で調整し、重複候補は最短時差優先で確定します。

Q. ガス代(ETH)と手数料(USDC等)が混在しますが?
A. 本稿では取引所手数料のみ実装。オンチェーン手数料は支払資産の減少として601-1/101-1の仕訳を追加してください。

Q. 会計基準(JGAAP/IFRS)差分は?
A. 科目や評価の方針は会社の会計方針に従って下さい。ここでは技術実装の再現性に絞っています。

Q. 制裁・リスクの検知は?
A. address_mapに制裁フラグを持たせ、未突合のうち高リスク宛先を優先エスカレーションするのが実務的です。

付録(dbt/SQLへの展開ヒント)

models/staging/:onchain_tx, exchange_ledger, price_feed, address_map

models/mart/matches.sql:本稿のmatchesをdbt化

models/mart/journal.sql:会計テンプレート(科目マッピング)をYAMLで外出し

tests/:突合件数/金額の期待値をdbt testsで自動検証

免責

本記事は一般的な技術情報の提供を目的とし、法務・会計・税務の助言ではありません。実際の手続・設定・会計処理は、社内規程・顧問専門家および最新の一次資料をご確認ください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?