#はじめに
Dota2というMOBAゲームの試合データの分析を行うため、データを自前で準備しました。
#やりたいこと
Dota2のデータ分析を行う上で、大量のデータが必要になります。
そこで、今回はPythonでデータ収集及び加工を行います。
データ分析基盤として、データレイク、データウェアハウスを用意することが目的です。
#つかったもの
-
Python 3.7.3
- OS:Windows 10
- condaにて環境構築
-
使用ライブラリ
-
dota2api
-
mysql.connector
-
joblib
-
pandas
-
numpy
#どうやったか
-
dota2api
を使用してValve社が提供するAPIから試合データを取得 - データ構造を把握し、DBを設計
- 取得したデータを加工、挿入してデータレイク、データウェアハウスを構築
- 並列処理で大量のデータの収集
##データの構造把握
Dota2の試合データはdota2api
というライブラリを使って取得できます。
import dota2api
api = dota2api.Initialise("YOUR_API_KEY")
APIのイニシャライズを行うため、ValveからAPIキーを取得する必要があります。
Steam IDが必要ですが、Dotaプレイヤーである皆さんは当然お持ちのことかと思います。
以降このInitialise
オブジェクトを使用し、データを取得していきます。
match = api.get_match_details(match_id=SOME_MATCH_ID)
Initialise
オブジェクトには色々なメソッドがあるのですが、まず今回は試合の詳細データが取得できるget_match_details
を使用します。
引数として取得したい試合のmatch_idが必要です。
この結果として、以下のような構造を持つデータがdict
形式で取得できます。
{
"players":[...],
"radiant_win":false,
"duration":1914,
"start_time":1342739723,
"match_id":27110133,
"match_seq_num":27106670,
"tower_status_radiant":4,
"tower_status_dire":1974,
"barracks_status_radiant":3,
"barracks_status_dire":63,
"cluster":131,,
"first_blood_time":133,
"lobby_type":0,
"human_players":10,
"leagueid":0,
"positive_votes":0,
"negative_votes":0,
"game_mode":0
}
"players"
はlist
になっていて、その中身として以下のような構造のdict
を持っています。
{
"account_id":75021757
"player_slot":0
"hero_id":45
"item_0":50
"item_1":79
"item_2":88
"item_3":36
"item_4":73
"item_5":73
"kills":1
"deaths":12
"assists":6
"leaver_status":0
"gold":345
"last_hits":98
"denies":2
"gold_per_min":261
"xp_per_min":304
"gold_spent":7185
"hero_damage":8270
"tower_damage":597
"hero_healing":39
"level":13
}
Dota2における分析に特に必要なのは、プレイヤーごとの試合後の結果を表す"players"
の中身だと判断できそうですね。
ですが、ここでは一旦全ての中身をデータレイク的役割を持つテーブル(以下データレイク)にしまっておくことにします。
生データを残しておくことで、後から他のデータが必要になった際、データレイクからパースし直すだけで済むという利点を持ちます。
##DB設計
次に上記のデータを収集するDBを設計していきます。
今回初めてDBを触るので、インターネット上に知見が多く存在するMySQLを使用します。
今回最低限必要になるテーブルとしては
- 生データを一旦しまっておくためのデータレイク
- 実際に分析に必要になるデータを整理して所持するためのデータウェアハウス
が考えられるので、それぞれについて定義をしていきます。
###データレイクの定義
今回は単純にget_match_details
で取得したdict
の最上層のKeyをカラムに取ります。
"players"
だけは「dict
を要素とするlist
」をValueとして持つので、後から中身を抽出しやすいようにjson
形式のカラムにしておきたいです。
そのために、"players"
だけはValueをそのまま挿入せず、{"players":[.....]}
のような形の使用を想定します。
また、match_idがユニークな値となるので、Primary Keyとして設定します。
生データと残すといいつつ、容量や明らかに必要のないデータなどの兼ね合いで、以下のようなイメージでテーブルを作製しました。
match_id | players | duration | lobby_name | radiant_win |
---|---|---|---|---|
INT | JSON | INT | STR | INT |
###データウェアハウスの定義
このテーブルの要素としては、"players"
の中身である、プレイヤーの試合後のデータが考えられます。
なのでカラムとしては"players"
のKeyを使用します。
属する試合のmatchidもカラムとして用意することで、依存関係を示すことができます。
また後述しますが、今回はroleの判定を行ったうえで分析を行いたいのでroleのカラムも用意します。
ユニークな値は特に存在しないため、idをauto incrementで記録してPrimal Keyとします。
また、matchidをもとにクエリを発行することを考慮し、match_idにインデックスを貼っておきます。
以下のようなイメージのテーブルを作製しました。
auto_increment_id | match_id | player_slot | ..... | ..... | role |
---|---|---|---|---|---|
INT | INT | INT | .... | .... | INT |
##実装
DBが設計出来たので、スクリプトを書いてデータを収集していきます。
実装したい処理としては
- 試合データを取得してデータレイクに収集
- データレイクから
"players"
の中身を取り出して整形してデータウェアハウスに収集 - なんらかのロジックでroleの判定を行ってデータウェアハウスに追加
以上のものが考えられます。
###試合データの収集
試合データをデータレイクに収集していきます。
import dota2api
api = dota2api.Initialise("YOUR_API_KEY")
まずはdota2api
をimportして、Initialise
オブジェクトを用意します。
import mysql.connector
cnx = mysql.connector.connect(user=USER,password=PASSWORD,host=HOST)
cursor = cnx.cursor(buffered=True)
次にmysql.connector
をimportし、cursor
オブジェクトを用意します。
USER,PASSWORD,HOSTにはMySQLで登録したユーザー情報を入力しておきます。
このcursor
オブジェクトを使用してDBと接続し、クエリの発行を行います。
import json
match = API.get_match_details(match_id=some_match_id)
players_json = json.dumps({"players":match['players']})
picks_bans_json = json.dumps({'picks_bans':match['picks_bans']})
Pythonの標準ライブラリであるjson
をimportしておきます。
some_match_id
には取得したい試合のmatch_idを入れます。
そしてちょっと無理やりですが、上記のような形でjson
で入れたいデータを作製します。
json.dumps()
はdict
を引数にとることでjson
形式の文字列を返すことができます。
insert_query = (
"""INSERT INTO `DATALAKE`
(`match_id`,`playerdata`,`duration`,`lobby_name`,`picks_bans`,`radiant_win`)
VALUES (%s,%s,%s,%s,%s,%s)"""
)
insert_data = (
match['match_id'],players_json,match['duration'],
match['lobby_name'],picks_bans_json,match['radiant_win']
)
発行するクエリとtuple
型のデータを上記のように準備します。
クエリ内の%s
と記載されている部分は、フォーマット指定子といい、データと合わせて下記記載の工程execute
を行うことで、文字列に置換を行うことができます。
このような記述をしておくことで、コードの可視性の向上、クエリ文の再利用などが可能になります。
クエリに記述してあるものと同じ順番でinsert_data
に挿入するデータを並べておく必要があります。
cursor.execute("USE YOUR_DATABASE")
cursor.execute(insert_query,insert_data)
cnx.commit()
cursor.execute()
では引数に文字列を入れることで、DBにクエリを発行することができます。
また引数にクエリとデータを用いることで、上記クエリ内のフォーマット指定子%s
がデータに用意した文字列で置換された上でMySQLにクエリが発行されます。
最後にcommit()
を行なわなければクエリの実行結果が保存されないので気を付けましょう。
これで1試合分のデータを用意することができました。
レコードのイメージは以下のような感じです。数値は適当です。
match_id | players | duration | lobby_name | radiant_win |
---|---|---|---|---|
4398348348 | {"players":[....]} | 7000 | Ranked | 1 |
次は先ほどjson で挿入したデータを分割し、データウェアハウスを作製します。 |
###データの分割
データレイクからplayers
を取り出し、データウェアハウスに挿入を行います。
getplayers_query = (r"""SELECT JSON_EXTRACT(`playerdata`,"$.players") FROM `DATALAKE` WHERE matchid = %s""")
getplayers_data = (some_match_id,)
cursor.execute(getplayers_query,getplayers_data)
今回も発行するクエリとデータをそれぞれ用意します。今回のデータの要素数は1つとなっていますが、その際に,
を最後に記載しないとtuple
にはならないので注意しましょう。
上記のコードではカラムplayerdata
のデータを取得するクエリを書いています。
some_match_id
には先ほど挿入したデータのmatchidを入れましょう。
MySQLではカラムのデータ型にjson
を指定することができます。その内部の値の検索を可能にするのがJSON EXTRACT
です。
JSON_EXTRACT(カラム,$.key)
といった形で利用します。指定したカラムを対象とし、指定したkeyで値の抽出を行い、そのvalueを出力します。
players
の中身は{'players:[.....]'}
の様な形になっているので、今回のクエリでは配列が返ってきます。
result = cursor.fetchone()
result_list = json.loads(result[0])
for i in range(10):
eachdata = result_list[i]
eachdata['match_id'] = some_match_id
player_query = (
"""INSERT INTO `DATAWAREHOUSE`
(`match_id`,`player_slot`,`hero_id`,`item_0`,`item_1`,`item_2`,`item_3`,
`item_4`,`item_5`,`backpack_0`,`backpack_1`,`backpack_2`,`kills`,`deaths`,
`assists`,`leaver_status`,`last_hits`,`denies`,`gold_per_min`,`xp_per_min`,
`level`,`hero_damage`,`hero_healing`,`tower_damage`)
VALUES
(%(match_id)s,%(player_slot)s,%(hero_id)s,%(item_0)s,%(item_1)s,%(item_2)s,
%(item_3)s,%(item_4)s,%(item_5)s,%(backpack_0)s,%(backpack_1)s,%(backpack_2)s,
%(kills)s,%(deaths)s,%(assists)s,%(leaver_status)s,%(last_hits)s,%(denies)s,
%(gold_per_min)s,%(xp_per_min)s,%(level)s,%(hero_damage)s,%(hero_healing)s,%(tower_damage)s)"""
)
cursor.execute(player_query,eachdata)
cnx.commit()
curos.fetchone()
でクエリの結果をtuple
で取得できます。今回は1行だけなので、要素も1つだけです。
json.loads()
は文字列を引数にとることで、Pythonで扱える型にデコードをしてくれます。
つまり今回でいえば、json.dumps
で変換前の「dict
を要素として持つlist
」を返します。
players
は1試合内のそれぞれのheroのデータとなるので要素数は10となります。
そのそれぞれに対して、データを取り出し、DBに挿入する処理を行います。
players
にはmatch_idの要素は含まれていないので、for文で取り出したdict
に追加しておきます。
先ほど発行したクエリと同じく、クエリ文とデータをわけて準備するのですが、先ほどは%s
としてある演算子が今回は%(...)s
となっていますね。
これはデータにdict
を用いる場合に使えるのですが、()内にKeyを指定することで、目的のValueに置換することができます。
今回はdict
のデータが既にあり、また挿入するカラムが多いので混乱を招かないためにこのような形を採用します。
あとはcursor.execute
でクエリを発行し、commit()
を行って完了です。
以下のようなイメージのレコードが挿入できました。数値は適当です。
auto_increment_id | match_id | player_slot | ..... | ..... | role |
---|---|---|---|---|---|
1 | 4398348348 | 0 | .... | .... | |
... | ... | ... | .... | .... | ... |
10 | 4398348348 | 132 | .... | .... |
###roleの判定
最後にデータウェアハウスに挿入したデータからroleの判定を行い、データウェアハウスに追加します。
roleの判定の仕方ですが、今回はシンプルにgpmが高い順番にpos1-5を決定していくことにします。
role_get_sql = ("SELECT `gold_per_min`,`player_slot` FROM `DATAWAREHOUSE` WHERE `match_id` =%s ORDER BY `player_slot`")
role_get_data = (some_match_id,)
cursor.execute(role_get_sql, role_get_data)
result = cursor.fetchall()
またまたクエリ文とデータを準備し、クエリを発行します。
今回は先ほど挿入したデータからgold_per_min
,player_slot
を取得します。
ORDER BY
を指定することで、並び順をplayer_slot
で昇順にソートしています。
player_slot
というのは1試合にいるプレイヤーの並び順を示します。
Radiantのプレイヤーは(0,1,2,3,4),Direのプレイヤーには(128,129,130,131,132)が与えられます。
これが1つの試合におけるそれぞれプレイヤーのユニークな値となります。
今回は先ほどと異なり、cursor.fetchall()
を使用してデータを取得します。
fetchone()
では1行のデータを取得するのに対し、fetchall()
では全行のデータを取得できます。
上記のクエリでは1試合に含まれる全プレイヤーのデータ、つまり全部で10行のデータを求めています。
それをまとめて取得することで、tuple
としてデータが返ってきます。
((some_gpm,0),(some_gpm,2),........,(some_gpm,131),(some_gpm,132))
上記のような要素を持つtuple
が取得できます。
radiant = (result[0], result[1], result[2], result[3], result[4])
dire = (result[5], result[6], result[7], result[8], result[9])
role_radiant = sorted(radiant, key=lambda x: x[0], reverse=True)
role_dire = sorted(dire, key=lambda x: x[0], reverse=True)
次に、radiant,dire陣営それぞれに関してtuple
にまとめます。
まとめたtuple
に対して、sorted
関数を使用し、gold_per_min
についてソートを行います。
sorted
は引数に入れたtuple
をソートした新たなオブジェクトを生成して返します。
radiant
,dire
各要素のtuple
の中身は(gold_per_min,playerslot)
となっているので、1番目の要素についてソートを行いたいですね。
sorted
は引数に関数をkeyとして渡すことで、その関数の戻り値を参照してソートを行ってくれます。
ここでは、lamda
式を使用して関数を定義します。
lamda
式とは lamda 引数 : 戻り値
といった形で定義される、名前を付けることなくその場で使用できる関数です。
今回は戻り値の部分にtuple
の1番目の要素を返す記述をすることで、ソート対象をgold_per_min
の値に指定できます。
role_list_radiant = []
role_list_dire = []
for i in range(4):
role_list_radiant.append(role_radiant.index(radiant[i]))
role_list_dire.append(role_dire.index(dire[i]))
role_list_radiant = list(map(lambda x:x+1,role_list_radiant))
role_list_dire = list(map(lambda x:x+1,role_list_dire))
それぞれの陣営ごとに空list
を作成します。
list
の持つindex
メソッドでそれぞれの要素が何番目にソートされたかが分かります。
それがそのままroleに繋がるのですが、この場合0-4のインデックスがそれぞれ与えられます。
実際の値としては1-5の範囲が欲しいので、map
関数を使用して加工をします。
map
関数はmap(関数,対象の配列)
の形をとることで配列に対して同じ関数でまとめて処理を行えます。
ここでもlamda
式を使用しlist
内全ての要素に1を加算し、pos1-5の表記に整えます。
role_insert_query = ("UPDATE `DATAWAREHOUSE` SET `role`=%s WHERE `match_id`=%s AND `player_slot`=%s")
role_insert_values = []
for i, role in zip(range(0,4),role_list_radiant):
each_tuple = (role,some_match_id,i)
role_insert_values.append(each_tuple)
for i, role in zip(range(128,133),role_list_dire):
each_tuple = (role,some_match_id,i)
role_insert_values.append(each_tuple)
cursor.executemany(role_insert_query,role_insert_values)
cnx.commit()
クエリ文、データを準備します。
今回のクエリはINSERT
ではなくUPDATE
を使用することで、既にデータが存在するレコードに対して更新が行えます。
データの準備に関してはzip
関数によって複数の要素に対してfor
文を回すことができます。
(role,match_id,player_slot)
といった要素を持つlist
を生成しています。
また今回はexecute
ではなくexecutemany
というメソッドを使用しています。
これはデータに配列を用いることで、複数のデータに対して同じクエリを次々と発行することができます。
1つ1つクエリを発行するよりもまとめて発行したほうが処理速度が速く、コードも短い行で済むのでこの形をとっています。
最後にcommit()
を行い完了です。
##並列処理でのデータ収集
上記では試合データ1件に対しての処理を実装しました。
ですが、実際にはもっとたくさんの件数のデータが分析には必要です。
そこで、先ほど実装した処理を関数化し、繰り返し処理していきます。
その速度向上のため、joblib
というライブラリを使用した並列処理を実装していきます。
from joblib import Parallel, delayed
joblib
からParallel
,delayed
をimportします。
def insert_into_datalake(some_match_id):
try:
#データレイクに入れる処理
except:
print("match_id {} not found".format(some_match_id))
まずは先ほどのAPIからデータを取得する処理を関数として定義します。
関数の引数としてmatch_idを設定することで、異なる複数の試合データを取得できるようにします。
ここで1つ注意ですが、Dota2には試合データが存在しないmatch_idというものがあります。
恐らくはpractice gameやmod gameなどにmatch_idが割り振られているため、データが取得できないというような感じだとは思いますが、とにかくデータを取得するのに失敗するmatch_idが存在します。
そこで、try
,except
を使用した例外処理を記述します。
これによりtry
以下の処理が失敗しエラーが出た場合、except
以下の処理が実行されることで、途中でエラーを出すことなく繰り返し処理の実行が可能になります。
def use_joblib_insert(startid,upto_number):
Parallel(n_jobs=4,verbose=10)([delayed(insert_into_datalake)(some_match_id) for some_match_id in range(startid,startid + upto_number)])
joblib
を使用した並列処理も関数化します。ここで使用しているjoblib
のメソッドを解説します。
delayed
はdelayed(関数)(引数)
の形をとり、(関数,引数)
の形のtuple
を返します。
Parallel
はParallel(オプション引数)(list)
の形をとり、list
内のdelayed
で生成されたtuple
について並列処理を実行します。
つまり上記関数で実行している処理は
-
delayed
でParallel
に与えるtuple
を生成 - 内包表記で上記
tuple
のlist
を生成 - 与えられたオプション引数、
list
を元に並列処理を実行
というフローになっています。
そしてオプション引数ですが、n_jobs
は並列処理を実行するコア数、verbose
はログを出力する頻度(10で最大,52以上で結果も出力)となっています。
並列数はもっとたくさん振ってもいいのですが、あまり早くリクエストを送りすぎると取得制限がかかってしまうため、控えめに設定してあります。
次にuse_joblib_insert
の引数の説明をします。
startid
は一番最初に取得したい試合のmatch_id,upto_number
はstartid
から何試合分までを取得するか、という内容になっています。
あるmatch_idから1000万件先まで取得したい場合は
use_joblib_insert(some_match_id,10000000)
といった形で実行します。
def player_splitter(some_match_id):
try:
#データ分割の処理
except:
print("failed to split data")
def role_hantei(some_match_id):
try:
#データ分割の処理
except:
print("failed to split data")
データ分割、role判定も同じように関数化した後joblib
で並列処理したいのですが、先ほどと実装が異なる点があります。
今回の場合、引数として与えるmatch_idは「データレイクに入れた試合データのmatch_id」であるはずです。
なので、引数として与えたいmatch_idをデータレイクから入手する必要があります。
import pandas as pd
import numpy as np
ここでPythonのライブラリ、pandas
とnumpy
をimportします。
pandas
はデータ操作や処理を、numpy
は数値計算を行うライブラリです。
conn = mysql.connector.connect(user=USER, password=PASSWORD, host=HOST,database=DATABASE)
get_match_id_sql = ("SELECT `match_id` from `DATALAKE`")
match_df = pd.read_sql(get_match_id_sql,conn)
pandas
用にmysql.connector
の接続オブジェクトを生成します。今回はキーワード引数にdatabase
を与えてあげています。
クエリ文の内容は、データレイクからmatch_idだけを全取得するというものです。
pd.read_sql(クエリ、接続オブジェクト)
を使用することで、クエリの結果が入ったpandas
のデータフレームオブジェクトが生成されます。
データフレームというのは行列データが入ったオブジェクトで、数々のメソッドを使用してデータ操作が容易に行えます。
今回の場合は列はmatch_idのみの1列、行はデータレイクに入れたレコード数だけの行数のデータとなっています。
match_df_arr = np.array(match_df['matchid'])
match_df_arr = map(int,match_df_arr)
np.array()
は引数にオブジェクトを与えることでNumPy配列を生成できます。
データフレームからlist
を生成することもできますが、サイズの大きい配列だとNumPy配列の方が処理が高速なのかなと考え、このような形にしてます。(未検証)
MySQLに入れられる型にするため、map
関数でint
型に変換しています。
Parallel(n_jobs=8,verbose=10)([delayed(player_splitter)(some_match_id) for some_match_id in match_df_arr])
あとはfor
文を回すオブジェクトに先ほど生成したmatch_df_arr
に指定することで処理を行います。
今回はリクエスト制限などは無いので、並列数は8に設定しています。
role判定に関しても同様に実装できますので、詳細は割愛します。
以上で並列処理による大量のデータ収集及び加工が実装できました。
##今後
これまでの手順でDota2用の分析のデータを集めることができました。
今後これを用いてデータ集計などによる分析を行う予定です。
またランダムフォレストによる特徴量決定、word2vecによるheroの類似度、あるいはアンチピックモデルの構築を考えています。
そして今回はローカルにデータを集約しましたが、スケールアウトの容易性やアクセシビリティなどを考えると、クラウド上にデータを集約したほうが良さそうです。
パッチ毎にデータを収集する必要性があることから、ワークフローを組んでデータを収集の自動化なども行いたいです。
なので今後のプランとして
-
Luigi
などのフレームワークを利用したワークフローの作成 - データレイクを
Cloud Firestore
,データウェアハウスをBigQuery
などを利用して構築
を実装していきたいと思います。