はじめに
この記事は続編になります
以下の記事もあわせて読んでいただけると幸いです
やりたいこと(再掲)
大きく分けて以下の3つです
-
雀魂の対局結果を取得
- できるだけ手動入力は避けたい
-
各種スタッツ、ポイントの計算
- 各対局の立直回数、和了回数、放銃回数の集計ができる
- 通算の立直率、和了率、放銃率、副露率、ポイントの集計ができる
- ポイントは、様々なルールに対応できるように
-
アクセスしやすい場所への格納
- 自分ひとりだけでなく、友人も含め簡単に見ることができる
このページでは
3. アクセスしやすい場所への格納
について記述します
アクセスしやすい場所への格納
さて、前回まででデータ部分の集計は完了しました
次に、これらの集計をどのように見せるか?という部分について考えていきます
-
ファイル編集側の要件
- 表形式
- だれでもアクセスできる
- 編集はできない
- 日々更新可能
-
ユーザー側の要件(雀魂と友人の環境)
- 雀魂
- PCではブラウザ、スマートフォン・タブレットではアプリで動く
- 友人
- コンシューマーゲームが主で、PCを所持していない
- 雀魂はスマホで行っている
- 雀魂
以上の情報から、このように設計しました
- データのクラウド格納:Googleスプレッドシート
- 結果の通知:LINE BOT
Googleスプレッドシートへの格納
雀魂友人戦の成績管理Botを作成した話➁で作成したcsvファイルをpythonでGoogleスプレッドシートへのアップロードも済ませてしまいます
通算の成績と、対局ごとの成績があるのでそれぞれ別シートで格納します
ただ、認証等を済ませる必要があって色々面倒でした
こちらのページを参考にGCPからGoogleDriveやスプレッドシートのAPIを取得し、以下のPythonファイルに情報を入れました
def save_to_google_sheets(df, sheet_name):
"""
DataFrameをGoogleスプレッドシートに保存する
:param df: 保存するDataFrame
:param spreadsheet_key: スプレッドシートのキー
:param worksheet_name: 保存先のワークシート名
"""
try:
# 認証情報設定
## 各自で設定
scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
credentials = ServiceAccountCredentials.from_json_keyfile_name('', scope)
gc = gspread.authorize(credentials)
#共有設定したスプレッドシートキーを変数[SPREADSHEET_KEY]に格納する。
SPREADSHEET_KEY = ''
#共有設定したスプレッドシートのシートを上書き保存(削除して新規作成)
try:
worksheet = gc.open_by_key(SPREADSHEET_KEY).worksheet(sheet_name)
gc.open_by_key(SPREADSHEET_KEY).del_worksheet(worksheet)
except Exception as e:
pass
gc.open_by_key(SPREADSHEET_KEY).add_worksheet(title=sheet_name, rows="1000", cols="20")
worksheet = gc.open_by_key(SPREADSHEET_KEY).worksheet(sheet_name)
# DataFrameをスプレッドシートに書き込む
set_with_dataframe(worksheet, df)
# print(f"データフレームをスプレッドシートに保存しました。")
except Exception as e:
print(f"Googleスプレッドシートへの保存中にエラーが発生しました: {e}")
こちらをcsvの保存後に実行させればGoogleスプレッドシートへのアップロードは完了です!
GCPですが、この程度の操作であればお金はかかりません!
LINE BOT
もちろんGoogleスプレッドシートのリンクだけでも、ほかのプレイヤーに成績を共有できますが、PC以外では使い勝手が悪いので、いつもやり取りしているグループLINEにBOTを導入することにしました
Discordでもよかったのですが、今回はLINE BOTにしました
LINE BOTはLINEのアカウントに加えてビジネスアカウントというものを登録すれば作成できます
この辺りは、LINE Developerがかなり親切に書いていましたので、読めばわかると思います
さて、LINE BOT作成後は、このGoogleスプレッドシートとLINEBOTの接続になります
Google スプレッドシートとLINE BOTの接続
いままで、ローカルに保存したデータを入力としたPythonを動かして、Googleスプレッドシートまでアップロードできましたが、LINE BOTにつなげるのは、Pythonでは自分の調べた限りできないようでした!
接続に繋ぐのはいわゆるGAS(Google Apps Script)と呼ばれるものになります
JavaScriptで動くらしく、あまり書いたことのない自分はほぼ生成AIに頼りました。。。
ただ、やりたいことはスプレッドシートから任意の値を取得して、LINEでいい感じに送るというものでしたので、そこまで難しくなかったです!
以下参考までに
const CHANNEL_ACCESS_TOKEN = "";
function doPost(e) {
const json = JSON.parse(e.postData.contents);
const replyToken = json.events[0].replyToken;
// メッセージタイプが'text'でない場合の処理を追加 (任意だが推奨)
if (json.events[0].type !== 'message' || json.events[0].message.type !== 'text') {
return ContentService.createTextOutput("Not a text message.");
}
const userMessage = json.events[0].message.text;
const sheetId = ''; // スプレッドシートID
const sheetNameMap = {
"総合成績": "総合成績",
"試合成績": "試合成績"
};
// 未対応キーワードは返信しない
if (!(userMessage in sheetNameMap)) {
return ContentService.createTextOutput("OK");
}
if (userMessage in sheetNameMap) {
const ss = SpreadsheetApp.openById(sheetId);
const sheet = ss.getSheetByName(sheetNameMap[userMessage]);
if (sheet) {
let replyMessages = []; // 各行の整形済みメッセージを格納する配列
// ヘッダー行(1行目)から必要なカラムのインデックスを取得
const headers = sheet.getRange(1, 1, 1, sheet.getLastColumn()).getValues()[0];
if (userMessage === "試合成績") {
const lastRow = sheet.getLastRow();
// Math.min を使用し、ヘッダー行を除いたデータ行数を考慮
const readRowCount = Math.min(4, lastRow - 1);
if (readRowCount <= 0) { // データ行がない場合
replyText = "表示できるデータがありません。";
} else {
const range = sheet.getRange(2, 1, readRowCount, sheet.getLastColumn()); // ヘッダー行を除いてデータを取得
const values = range.getValues();
// インデックスの定義(ループの外側)
const userIndex = headers.indexOf('ユーザー名');
const scoreIndex = headers.indexOf('ポイント');
const rankIndex = headers.indexOf('順位');
const totalGamesIndex = headers.indexOf('総局数');
const reachIndex = headers.indexOf('立直回数');
const winIndex = headers.indexOf('和了回数');
const dealInIndex = headers.indexOf('放銃回数');
values.forEach(row => {
if (userIndex !== -1 && scoreIndex !== -1 && rankIndex !== -1 &&
totalGamesIndex !== -1 && reachIndex !== -1 && winIndex !== -1 && dealInIndex !== -1) {
const message =
"---" +
"\n【対局結果】" +
"\nユーザー名:" + row[userIndex] +
"\nポイント:" + row[scoreIndex] +
"\n順位:" + row[rankIndex] + "位" +
"\n総局数:" + row[totalGamesIndex] +
"\n立直:" + row[reachIndex] + "回" +
"\n和了:" + row[winIndex] + "回" +
"\n放銃:" + row[dealInIndex] + "回";
replyMessages.push(message);
}
});
// 配列を結合して文字列としてreplyTextに代入
replyText = replyMessages.join('\n---\n');
}
} else if (userMessage === "総合成績") {
const data = sheet.getDataRange().getValues(); // データ全体を取得
// インデックスの定義(ループの外側)
const userIndex = headers.indexOf('ユーザー名');
const gameIndex = headers.indexOf('試合数');
const roundIndex = headers.indexOf('総局数');
const sumpointIndex = headers.indexOf('ポイント');
const averankIndex = headers.indexOf('平均着順');
const reachrateIndex = headers.indexOf('立直率(%)');
const winrateIndex = headers.indexOf('和了率(%)');
const dealrateInIndex = headers.indexOf('放銃率(%)');
const callrateInIndex = headers.indexOf('副露率(%)');
// ヘッダー行を除くデータのみをループ
// values.forEach(row => { // ★元のコードにあった誤り
data.slice(1).forEach(row => { // ★修正: data.slice(1)でヘッダー行をスキップし、dataを使用
if (userIndex !== -1 && gameIndex !== -1 && roundIndex !== -1 && sumpointIndex !== -1 &&
averankIndex !== -1 && reachrateIndex !== -1 && winrateIndex !== -1 && dealrateInIndex !== -1 && callrateInIndex !== -1) {
const message =
"---" +
"\n【総合成績】" +
"\nユーザー名:" + row[userIndex] +
"\n試合数:" + row[gameIndex] +
"\n総局数:" + row[roundIndex] +
"\nポイント:" + row[sumpointIndex] +
"\n平均着順:" + row[averankIndex] +
"\n立直率:" + row[reachrateIndex] + "%" +
"\n和了率:" + row[winrateIndex] + "%" +
"\n放銃率:" + row[dealrateInIndex] + "%" +
"\n副露率:" + row[callrateInIndex] + "%";
replyMessages.push(message);
}
});
// 配列を結合して文字列としてreplyTextに代入
replyText = replyMessages.join('\n---\n');
}
} else {
replyText = "シートが見つかりません。";
}
}
// formatSheetData(data) の呼び出しは不要なので削除済み
// LINEへ返信
UrlFetchApp.fetch("https://api.line.me/v2/bot/message/reply", {
"method": "post",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer " + CHANNEL_ACCESS_TOKEN
},
"payload": JSON.stringify({
"replyToken": replyToken,
"messages": [{
"type": "text",
"text": replyText
}]
})
});
return ContentService.createTextOutput("OK");
}
おわりに
実際の運用画面を見せておくとこんな感じになりました!
実際の対局後に成績が反映されるまでのフローとしては
- 牌譜URLを取得(自分が参加していない対局は共有してもらう)
- こちらの手順で牌譜のダウンロード
- Pythonファイルの実行
の3ステップになっており、対局中の待ち時間でもすぐに反映可能な設計にすることができました!
今回は主な成績のみを集計しましたが、着順の内訳や和了時打点や放銃時打点など詳細な情報も集計できると思うので、改良の余地はまだまだあると思います!
最後まで読んでいただきありがとうございました!
Appendixとして、今回作成に利用したPythonコードを全公開します
どなたかの参考になればうれしいです!
Appendix(コード全編)
Python
import json
import pandas as pd
from collections import defaultdict
import os
import csv
import gspread
from gspread_dataframe import set_with_dataframe
#ServiceAccountCredentials:Googleの各サービスへアクセスできるservice変数を生成します。
from oauth2client.service_account import ServiceAccountCredentials
# --- 各種統計を保持するデータ構造 ---
# --- ユーティリティ関数 ---
def load_json(file_path):
file_path = os.path.join("paifu_txt", file_path)
"""JSONファイルを読み込む"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if not content.strip(): # ファイルが空の場合
raise ValueError(f"ファイル {file_path} は空です。")
return json.loads(content)
except json.JSONDecodeError as e:
raise ValueError(f"ファイル {file_path} のJSON形式が不正です: {e}")
def load_existing_data(file_path):
"""既存のCSVデータを読み込む"""
try:
return pd.read_csv(file_path)
except FileNotFoundError:
return pd.DataFrame() # ファイルが存在しない場合は空のデータフレームを返す
def process_furo_flags(furo_flag, user_stats):
"""副露フラグを確認して副露回数を加算し、フラグをリセットする"""
for i in range(4):
if furo_flag[i]:
user_stats[i]["furo_games"] += 1
return [False] * 4 # フラグをリセット
def process_new_round(user_stats, furo_flag):
"""局開始時の処理"""
for i in range(4):
user_stats[i]["total_games"] += 1
return process_furo_flags(furo_flag, user_stats)
def process_discard_tile(result_value, user_stats):
"""打牌時の処理"""
seat = result_value.get("data", {}).get("seat")
if result_value.get("data", {}).get("is_liqi"):
user_stats[seat]["riichi_count"] += 1
return seat
def process_hule(result_value, user_stats, last_discard_seat):
"""上がり時の処理"""
hules = result_value.get("data", {}).get("hules", [])
for hule in hules:
agari_seat = hule.get("seat")
user_stats[agari_seat]["agari_count"] += 1
if not hule.get("zimo") and last_discard_seat is not None:
user_stats[last_discard_seat]["houjuu_count"] += 1
return None # 上がり後にリセット
def seat_to_seat(seat_number):
"""座席番号を整数に変換する関数"""
seat = ['東', '南', '西', '北']
return seat[seat_number]
def extract_data(input_file, output_file):
try:
# 既存のデータを読み込む
existing_data = load_existing_data(output_file)
existing_uuids = set(existing_data['uuid']) if not existing_data.empty else set()
user_stats = defaultdict(lambda: {
"total_games": 0,
"riichi_count": 0,
"houjuu_count": 0,
"furo_games": 0,
"agari_count": 0
})
# 新しいデータを読み込む
data = load_json(input_file)
new_uuid = data.get("head", {}).get("uuid", "")
# UUIDが既存データに含まれている場合はスキップ
if new_uuid in existing_uuids:
print(f"UUID {new_uuid} は既に存在しています。処理をスキップします。")
return
accounts = data.get("head", {}).get("accounts", [])
result = data.get("head", {}).get("result", []).get("players", [])
data_section = data.get("data", {}).get("data", {}).get("actions", [])
furo_flag = [False] * 4
last_discard_seat = None
if data_section:
# データセクションの処理
for action_item in data_section:
result_value = action_item.get("result")
if result_value:
name = result_value.get("name")
if name == '.lq.RecordNewRound':
furo_flag = process_new_round(user_stats, furo_flag)
elif name == '.lq.RecordDiscardTile':
last_discard_seat = process_discard_tile(result_value, user_stats)
elif name == '.lq.RecordHule':
last_discard_seat = process_hule(result_value, user_stats, last_discard_seat)
elif name == '.lq.RecordChiPengGang':
furo_seat = result_value.get("data", {}).get("seat")
furo_flag[furo_seat] = True
furo_flag = process_furo_flags(furo_flag, user_stats)
furo_flag = [False] * 4
last_discard_seat = None
# accountsデータを取得
accounts_df = pd.DataFrame(accounts)
accounts_df = accounts_df[['account_id', 'nickname', 'seat']]
# resultデータを取得
result_df = pd.DataFrame(result)
result_df = result_df[['seat', 'part_point_1']]
result_df['rank'] = result_df['part_point_1'].rank(method='min', ascending=False).astype(int)
# 順位を計算(同率順位、スキップあり)
result_df['rank'] = result_df['part_point_1'].rank(method='min', ascending=False).astype(int)
# 順位ごとの得点テーブル
point_table = {
1: 50000,
2: 10000,
3: -10000,
4: -30000
}
# 各順位に対して、同着順位の平均得点を割り振る関数
def get_adjusted_point(rank_series):
adjusted_points = []
for rank in rank_series:
# 同じ順位の人の数
same_ranks = rank_series[rank_series == rank]
# 同着した順位の範囲(例:2人で2位→rank: 2,3)
tied_ranks = list(range(rank, rank + len(same_ranks)))
# 対応する点数(ない順位は0点)
scores = [point_table.get(r, 0) for r in tied_ranks]
# 平均して加点
avg_score = sum(scores) / len(scores)
adjusted_points.append(avg_score)
return adjusted_points
# 順位点の加点(同率処理込み)
result_df['rank_bonus'] = get_adjusted_point(result_df['rank'])
# 30000点を引いて、1000点=1ptとして、小数第1位に変換
result_df['converted_point'] = round(((result_df['part_point_1'] + result_df['rank_bonus'] - 30000) / 1000), 1)
# user_statsをデータフレームに変換
user_stats_df = pd.DataFrame.from_dict(user_stats, orient='index').reset_index()
user_stats_df.rename(columns={'index': 'seat'}, inplace=True)
# accounts_df, result_df, user_stats_dfを結合
merge_df = accounts_df.merge(result_df, on='seat', how='inner')
merge_df = merge_df.merge(user_stats_df, on='seat', how='inner')
# UUIDを追加
merge_df['uuid'] = new_uuid
merge_df['seat'] = merge_df['seat'].apply(seat_to_seat)
# 必要な列を選択
merge_df = merge_df[['uuid', 'account_id', 'nickname', 'seat', 'converted_point', 'rank',
'total_games', 'riichi_count', 'agari_count', 'houjuu_count', 'furo_games']]
# 既存データに新しいデータを追加
if not existing_data.empty:
merge_df = pd.concat([existing_data, merge_df], ignore_index=True)
# CSV形式で保存
merge_df.sort_values('uuid', ascending=False).to_csv(output_file, index=False)
except Exception as e:
print(f"エラーが発生しました: {e}")
def save_to_google_sheets(df, sheet_name):
"""
DataFrameをGoogleスプレッドシートに保存する
:param df: 保存するDataFrame
:param spreadsheet_key: スプレッドシートのキー
:param worksheet_name: 保存先のワークシート名
"""
try:
# 認証情報設定
scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
credentials = ServiceAccountCredentials.from_json_keyfile_name('.json', scope)
gc = gspread.authorize(credentials)
#共有設定したスプレッドシートキーを変数[SPREADSHEET_KEY]に格納する。
SPREADSHEET_KEY = ''
#共有設定したスプレッドシートのシートを上書き保存(削除して新規作成)
try:
worksheet = gc.open_by_key(SPREADSHEET_KEY).worksheet(sheet_name)
gc.open_by_key(SPREADSHEET_KEY).del_worksheet(worksheet)
except Exception as e:
pass
gc.open_by_key(SPREADSHEET_KEY).add_worksheet(title=sheet_name, rows="1000", cols="20")
worksheet = gc.open_by_key(SPREADSHEET_KEY).worksheet(sheet_name)
# DataFrameをスプレッドシートに書き込む
set_with_dataframe(worksheet, df)
# print(f"データフレームをスプレッドシートに保存しました。")
except Exception as e:
print(f"Googleスプレッドシートへの保存中にエラーが発生しました: {e}")
def calculate_user_stats(input_file, output_file, user_ids):
"""
指定されたユーザーの成績を集計し、CSVに保存する
:param input_file: 元のCSVファイルパス
:param output_file: 集計結果を保存するCSVファイルパス
:param user_ids: 集計対象のユーザーIDリスト
"""
try:
# 元のCSVを読み込む
df = pd.read_csv(input_file)
# 集計処理
grouped = df.groupby('account_id').agg({
'uuid': 'count', # 対局数
'nickname': 'first', # ニックネーム
'total_games': 'sum', # 総対局数
'riichi_count': 'sum', # 立直回数
'agari_count': 'sum', # 上がり回数
'houjuu_count': 'sum', # 放銃回数
'furo_games': 'sum', # 副露した局数
'converted_point': 'sum', # ポイントの合計
'rank': 'mean' # 平均順位
}).reset_index()
# 立直率、上がり率、放銃率、副露率を計算
grouped['riichi_rate'] = (grouped['riichi_count'] / grouped['total_games'] * 100).round(2)
grouped['agari_rate'] = (grouped['agari_count'] / grouped['total_games'] * 100).round(2)
grouped['houjuu_rate'] = (grouped['houjuu_count'] / grouped['total_games'] * 100).round(2)
grouped['furo_rate'] = (grouped['furo_games'] / grouped['total_games'] * 100).round(2)
grouped['converted_point'] = round(grouped['converted_point'], 1)
grouped['rank'] = grouped['rank'].round(2) # 平均順位を小数点以下2桁に丸める
# 総局数を追加
grouped['total_rounds'] = grouped['uuid']
# 必要な列を選択
grouped = grouped[['account_id', 'nickname', 'total_games', 'total_rounds', 'converted_point', 'rank',
'riichi_rate', 'agari_rate', 'houjuu_rate', 'furo_rate']]
grouped.rename(columns={'account_id': 'アカウントID', 'nickname': 'ユーザー名', 'total_rounds': '試合数',
'total_games': '総局数', 'converted_point': 'ポイント', 'rank': '平均着順',
'riichi_rate': '立直率(%)', 'agari_rate': '和了率(%)',
'houjuu_rate': '放銃率(%)', 'furo_rate': '副露率(%)'}, inplace=True)
# CSV形式で保存
grouped.to_csv(output_file, index=False, encoding='utf-8-sig')
# 指定されたユーザーIDのみをフィルタリング
grouped = grouped[grouped['アカウントID'].isin(user_ids)]
# Google Spread Sheetsに保存
save_to_google_sheets(grouped, "総合成績")
df.rename(columns={'uuid': '対局ID', 'account_id': 'アカウントID', 'nickname': 'ユーザー名',
'total_games': '総局数', 'converted_point': 'ポイント', 'rank': '順位',
'riichi_count': '立直回数', 'agari_count': '和了回数',
'houjuu_count': '放銃回数', 'furo_count': '副露局数'}, inplace=True)
df = df.drop('seat', axis=1)
# 試合の成績を保存
save_to_google_sheets(df, "試合成績")
print(f"指定されたユーザーの成績を {output_file} に保存しました。")
except Exception as e:
print(f"エラーが発生しました: {e}")
# --- エントリーポイント ---
if __name__ == "__main__":
directory_path = "paifu_txt"
files = os.listdir(directory_path)
output_file = "extracted_data.csv"
for file in files:
input_file = file
# データ抽出
extract_data(input_file, output_file)
input_file = "extracted_data.csv" # 元のCSVファイル
output_file = "user_stats.csv"
user_ids = [] # 集計対象のユーザーIDリスト
# 成績計算
calculate_user_stats(input_file, output_file, user_ids)