4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Dota2の分析をするためにPythonで分析基盤を構築した

Last updated at Posted at 2019-08-02

#はじめに
Dota2というMOBAゲームの試合データの分析を行うため、データを自前で準備しました。

#やりたいこと
Dota2のデータ分析を行う上で、大量のデータが必要になります。
そこで、今回はPythonでデータ収集及び加工を行います。
データ分析基盤として、データレイク、データウェアハウスを用意することが目的です。

#つかったもの

  • Python 3.7.3

    • OS:Windows 10
    • condaにて環境構築
  • 使用ライブラリ

  • dota2api

  • mysql.connector

  • joblib

  • pandas

  • numpy

#どうやったか

  • dota2apiを使用してValve社が提供するAPIから試合データを取得
  • データ構造を把握し、DBを設計
  • 取得したデータを加工、挿入してデータレイク、データウェアハウスを構築
  • 並列処理で大量のデータの収集

##データの構造把握
Dota2の試合データはdota2apiというライブラリを使って取得できます。

import dota2api
api = dota2api.Initialise("YOUR_API_KEY")

APIのイニシャライズを行うため、ValveからAPIキーを取得する必要があります。
Steam IDが必要ですが、Dotaプレイヤーである皆さんは当然お持ちのことかと思います。
以降このInitialiseオブジェクトを使用し、データを取得していきます。

match = api.get_match_details(match_id=SOME_MATCH_ID)

Initialiseオブジェクトには色々なメソッドがあるのですが、まず今回は試合の詳細データが取得できるget_match_detailsを使用します。
引数として取得したい試合のmatch_idが必要です。
この結果として、以下のような構造を持つデータがdict形式で取得できます。

{
"players":[...],
"radiant_win":false,
"duration":1914,
"start_time":1342739723,
"match_id":27110133,
"match_seq_num":27106670,
"tower_status_radiant":4,
"tower_status_dire":1974,
"barracks_status_radiant":3,
"barracks_status_dire":63,
"cluster":131,,
"first_blood_time":133,
"lobby_type":0,
"human_players":10,
"leagueid":0,
"positive_votes":0,
"negative_votes":0,
"game_mode":0
}

"players"listになっていて、その中身として以下のような構造のdictを持っています。

{
"account_id":75021757
"player_slot":0
"hero_id":45
"item_0":50
"item_1":79
"item_2":88
"item_3":36
"item_4":73
"item_5":73
"kills":1
"deaths":12
"assists":6
"leaver_status":0
"gold":345
"last_hits":98
"denies":2
"gold_per_min":261
"xp_per_min":304
"gold_spent":7185
"hero_damage":8270
"tower_damage":597
"hero_healing":39
"level":13
}

Dota2における分析に特に必要なのは、プレイヤーごとの試合後の結果を表す"players"の中身だと判断できそうですね。
ですが、ここでは一旦全ての中身をデータレイク的役割を持つテーブル(以下データレイク)にしまっておくことにします。
生データを残しておくことで、後から他のデータが必要になった際、データレイクからパースし直すだけで済むという利点を持ちます。

##DB設計
次に上記のデータを収集するDBを設計していきます。
今回初めてDBを触るので、インターネット上に知見が多く存在するMySQLを使用します。

今回最低限必要になるテーブルとしては

  • 生データを一旦しまっておくためのデータレイク
  • 実際に分析に必要になるデータを整理して所持するためのデータウェアハウス
    が考えられるので、それぞれについて定義をしていきます。

###データレイクの定義
今回は単純にget_match_detailsで取得したdictの最上層のKeyをカラムに取ります。
"players"だけは「dictを要素とするlist」をValueとして持つので、後から中身を抽出しやすいようにjson形式のカラムにしておきたいです。
そのために、"players"だけはValueをそのまま挿入せず、{"players":[.....]}のような形の使用を想定します。
また、match_idがユニークな値となるので、Primary Keyとして設定します。
生データと残すといいつつ、容量や明らかに必要のないデータなどの兼ね合いで、以下のようなイメージでテーブルを作製しました。

match_id players duration lobby_name radiant_win
INT JSON INT STR INT

###データウェアハウスの定義
このテーブルの要素としては、"players"の中身である、プレイヤーの試合後のデータが考えられます。
なのでカラムとしては"players"Keyを使用します。
属する試合のmatchidもカラムとして用意することで、依存関係を示すことができます。
また後述しますが、今回はroleの判定を行ったうえで分析を行いたいのでroleのカラムも用意します。
ユニークな値は特に存在しないため、idをauto incrementで記録してPrimal Keyとします。
また、matchidをもとにクエリを発行することを考慮し、match_idにインデックスを貼っておきます。
以下のようなイメージのテーブルを作製しました。

auto_increment_id match_id player_slot ..... ..... role
INT INT INT .... .... INT

##実装
DBが設計出来たので、スクリプトを書いてデータを収集していきます。
実装したい処理としては

  • 試合データを取得してデータレイクに収集
  • データレイクから"players"の中身を取り出して整形してデータウェアハウスに収集
  • なんらかのロジックでroleの判定を行ってデータウェアハウスに追加
    以上のものが考えられます。

###試合データの収集
試合データをデータレイクに収集していきます。

import dota2api
api = dota2api.Initialise("YOUR_API_KEY")

まずはdota2apiをimportして、Initialiseオブジェクトを用意します。

import mysql.connector
cnx = mysql.connector.connect(user=USER,password=PASSWORD,host=HOST)
cursor = cnx.cursor(buffered=True)

次にmysql.connectorをimportし、cursorオブジェクトを用意します。
USER,PASSWORD,HOSTにはMySQLで登録したユーザー情報を入力しておきます。
このcursorオブジェクトを使用してDBと接続し、クエリの発行を行います。

import json
match = API.get_match_details(match_id=some_match_id)
players_json = json.dumps({"players":match['players']})
picks_bans_json = json.dumps({'picks_bans':match['picks_bans']})

Pythonの標準ライブラリであるjsonをimportしておきます。
some_match_idには取得したい試合のmatch_idを入れます。
そしてちょっと無理やりですが、上記のような形でjsonで入れたいデータを作製します。
json.dumps()dictを引数にとることでjson形式の文字列を返すことができます。

insert_query = (
                """INSERT INTO `DATALAKE` 
                (`match_id`,`playerdata`,`duration`,`lobby_name`,`picks_bans`,`radiant_win`) 
                VALUES (%s,%s,%s,%s,%s,%s)"""
               )
insert_data = (
               match['match_id'],players_json,match['duration'],
               match['lobby_name'],picks_bans_json,match['radiant_win']
              )

発行するクエリとtuple型のデータを上記のように準備します。
クエリ内の%sと記載されている部分は、フォーマット指定子といい、データと合わせて下記記載の工程executeを行うことで、文字列に置換を行うことができます。
このような記述をしておくことで、コードの可視性の向上、クエリ文の再利用などが可能になります。
クエリに記述してあるものと同じ順番でinsert_dataに挿入するデータを並べておく必要があります。

cursor.execute("USE YOUR_DATABASE")
cursor.execute(insert_query,insert_data)
cnx.commit()

cursor.execute()では引数に文字列を入れることで、DBにクエリを発行することができます。
また引数にクエリとデータを用いることで、上記クエリ内のフォーマット指定子%sがデータに用意した文字列で置換された上でMySQLにクエリが発行されます。
最後にcommit()を行なわなければクエリの実行結果が保存されないので気を付けましょう。

これで1試合分のデータを用意することができました。
レコードのイメージは以下のような感じです。数値は適当です。

match_id players duration lobby_name radiant_win
4398348348 {"players":[....]} 7000 Ranked 1
次は先ほどjsonで挿入したデータを分割し、データウェアハウスを作製します。

###データの分割
データレイクからplayersを取り出し、データウェアハウスに挿入を行います。

getplayers_query = (r"""SELECT JSON_EXTRACT(`playerdata`,"$.players") FROM `DATALAKE` WHERE matchid = %s""")
getplayers_data = (some_match_id,)
cursor.execute(getplayers_query,getplayers_data)

今回も発行するクエリとデータをそれぞれ用意します。今回のデータの要素数は1つとなっていますが、その際に,を最後に記載しないとtupleにはならないので注意しましょう。
上記のコードではカラムplayerdataのデータを取得するクエリを書いています。
some_match_idには先ほど挿入したデータのmatchidを入れましょう。
MySQLではカラムのデータ型にjsonを指定することができます。その内部の値の検索を可能にするのがJSON EXTRACTです。
JSON_EXTRACT(カラム,$.key)といった形で利用します。指定したカラムを対象とし、指定したkeyで値の抽出を行い、そのvalueを出力します。
playersの中身は{'players:[.....]'}の様な形になっているので、今回のクエリでは配列が返ってきます。

result = cursor.fetchone()
result_list = json.loads(result[0])
for i in range(10):
    eachdata = result_list[i]
    eachdata['match_id'] = some_match_id
    player_query = (
                       """INSERT INTO `DATAWAREHOUSE` 
                       (`match_id`,`player_slot`,`hero_id`,`item_0`,`item_1`,`item_2`,`item_3`,
                       `item_4`,`item_5`,`backpack_0`,`backpack_1`,`backpack_2`,`kills`,`deaths`,
                       `assists`,`leaver_status`,`last_hits`,`denies`,`gold_per_min`,`xp_per_min`,
                       `level`,`hero_damage`,`hero_healing`,`tower_damage`) 
                       VALUES 
                       (%(match_id)s,%(player_slot)s,%(hero_id)s,%(item_0)s,%(item_1)s,%(item_2)s,
                        %(item_3)s,%(item_4)s,%(item_5)s,%(backpack_0)s,%(backpack_1)s,%(backpack_2)s,
                        %(kills)s,%(deaths)s,%(assists)s,%(leaver_status)s,%(last_hits)s,%(denies)s,
                        %(gold_per_min)s,%(xp_per_min)s,%(level)s,%(hero_damage)s,%(hero_healing)s,%(tower_damage)s)"""
                       )
    cursor.execute(player_query,eachdata)
cnx.commit()

curos.fetchone()でクエリの結果をtupleで取得できます。今回は1行だけなので、要素も1つだけです。
json.loads()は文字列を引数にとることで、Pythonで扱える型にデコードをしてくれます。
つまり今回でいえば、json.dumpsで変換前の「dictを要素として持つlist」を返します。
playersは1試合内のそれぞれのheroのデータとなるので要素数は10となります。
そのそれぞれに対して、データを取り出し、DBに挿入する処理を行います。

playersにはmatch_idの要素は含まれていないので、for文で取り出したdictに追加しておきます。
先ほど発行したクエリと同じく、クエリ文とデータをわけて準備するのですが、先ほどは%sとしてある演算子が今回は%(...)sとなっていますね。
これはデータにdictを用いる場合に使えるのですが、()内にKeyを指定することで、目的のValueに置換することができます。
今回はdictのデータが既にあり、また挿入するカラムが多いので混乱を招かないためにこのような形を採用します。
あとはcursor.executeでクエリを発行し、commit()を行って完了です。
以下のようなイメージのレコードが挿入できました。数値は適当です。

auto_increment_id match_id player_slot ..... ..... role
1 4398348348 0 .... ....
... ... ... .... .... ...
10 4398348348 132 .... ....

###roleの判定
最後にデータウェアハウスに挿入したデータからroleの判定を行い、データウェアハウスに追加します。
roleの判定の仕方ですが、今回はシンプルにgpmが高い順番にpos1-5を決定していくことにします。

role_get_sql = ("SELECT `gold_per_min`,`player_slot` FROM `DATAWAREHOUSE` WHERE `match_id` =%s ORDER BY `player_slot`")
role_get_data = (some_match_id,)
cursor.execute(role_get_sql, role_get_data)
result = cursor.fetchall()

またまたクエリ文とデータを準備し、クエリを発行します。
今回は先ほど挿入したデータからgold_per_min,player_slotを取得します。
ORDER BYを指定することで、並び順をplayer_slotで昇順にソートしています。
player_slotというのは1試合にいるプレイヤーの並び順を示します。
Radiantのプレイヤーは(0,1,2,3,4),Direのプレイヤーには(128,129,130,131,132)が与えられます。
これが1つの試合におけるそれぞれプレイヤーのユニークな値となります。

今回は先ほどと異なり、cursor.fetchall()を使用してデータを取得します。
fetchone()では1行のデータを取得するのに対し、fetchall()では全行のデータを取得できます。
上記のクエリでは1試合に含まれる全プレイヤーのデータ、つまり全部で10行のデータを求めています。
それをまとめて取得することで、tupleとしてデータが返ってきます。

((some_gpm,0),(some_gpm,2),........,(some_gpm,131),(some_gpm,132))

上記のような要素を持つtupleが取得できます。

radiant = (result[0], result[1], result[2], result[3], result[4])
dire = (result[5], result[6], result[7], result[8], result[9])

role_radiant = sorted(radiant, key=lambda x: x[0], reverse=True)
role_dire = sorted(dire, key=lambda x: x[0], reverse=True)

次に、radiant,dire陣営それぞれに関してtupleにまとめます。
まとめたtupleに対して、sorted関数を使用し、gold_per_minについてソートを行います。
sortedは引数に入れたtupleをソートした新たなオブジェクトを生成して返します。
radiant,dire各要素のtupleの中身は(gold_per_min,playerslot)となっているので、1番目の要素についてソートを行いたいですね。

sortedは引数に関数をkeyとして渡すことで、その関数の戻り値を参照してソートを行ってくれます。
ここでは、lamda式を使用して関数を定義します。
lamda式とは lamda 引数 : 戻り値といった形で定義される、名前を付けることなくその場で使用できる関数です。
今回は戻り値の部分にtupleの1番目の要素を返す記述をすることで、ソート対象をgold_per_minの値に指定できます。

role_list_radiant = []
role_list_dire = []

for i in range(4):
    role_list_radiant.append(role_radiant.index(radiant[i]))
    role_list_dire.append(role_dire.index(dire[i]))

role_list_radiant = list(map(lambda x:x+1,role_list_radiant))
role_list_dire = list(map(lambda x:x+1,role_list_dire))

それぞれの陣営ごとに空listを作成します。
listの持つindexメソッドでそれぞれの要素が何番目にソートされたかが分かります。
それがそのままroleに繋がるのですが、この場合0-4のインデックスがそれぞれ与えられます。
実際の値としては1-5の範囲が欲しいので、map関数を使用して加工をします。
map関数はmap(関数,対象の配列)の形をとることで配列に対して同じ関数でまとめて処理を行えます。
ここでもlamda式を使用しlist内全ての要素に1を加算し、pos1-5の表記に整えます。

role_insert_query = ("UPDATE `DATAWAREHOUSE` SET `role`=%s WHERE `match_id`=%s AND `player_slot`=%s")
role_insert_values = []

for i, role in zip(range(0,4),role_list_radiant):
    each_tuple = (role,some_match_id,i)
    role_insert_values.append(each_tuple)

for i, role in zip(range(128,133),role_list_dire):
    each_tuple = (role,some_match_id,i)
    role_insert_values.append(each_tuple)

cursor.executemany(role_insert_query,role_insert_values)
cnx.commit()

クエリ文、データを準備します。
今回のクエリはINSERTではなくUPDATEを使用することで、既にデータが存在するレコードに対して更新が行えます。
データの準備に関してはzip関数によって複数の要素に対してfor文を回すことができます。
(role,match_id,player_slot)といった要素を持つlistを生成しています。
また今回はexecuteではなくexecutemanyというメソッドを使用しています。
これはデータに配列を用いることで、複数のデータに対して同じクエリを次々と発行することができます。
1つ1つクエリを発行するよりもまとめて発行したほうが処理速度が速く、コードも短い行で済むのでこの形をとっています。
最後にcommit()を行い完了です。 

##並列処理でのデータ収集
上記では試合データ1件に対しての処理を実装しました。
ですが、実際にはもっとたくさんの件数のデータが分析には必要です。
そこで、先ほど実装した処理を関数化し、繰り返し処理していきます。
その速度向上のため、joblibというライブラリを使用した並列処理を実装していきます。

from joblib import Parallel, delayed

joblibからParallel,delayedをimportします。

def insert_into_datalake(some_match_id):
    try:
        #データレイクに入れる処理
    except:
        print("match_id {} not found".format(some_match_id))       

まずは先ほどのAPIからデータを取得する処理を関数として定義します。
関数の引数としてmatch_idを設定することで、異なる複数の試合データを取得できるようにします。
ここで1つ注意ですが、Dota2には試合データが存在しないmatch_idというものがあります。
恐らくはpractice gameやmod gameなどにmatch_idが割り振られているため、データが取得できないというような感じだとは思いますが、とにかくデータを取得するのに失敗するmatch_idが存在します。
そこで、try,exceptを使用した例外処理を記述します。
これによりtry以下の処理が失敗しエラーが出た場合、except以下の処理が実行されることで、途中でエラーを出すことなく繰り返し処理の実行が可能になります。

def use_joblib_insert(startid,upto_number):
    Parallel(n_jobs=4,verbose=10)([delayed(insert_into_datalake)(some_match_id) for some_match_id in range(startid,startid + upto_number)])

joblibを使用した並列処理も関数化します。ここで使用しているjoblibのメソッドを解説します。
delayeddelayed(関数)(引数)の形をとり、(関数,引数)の形のtupleを返します。
ParallelParallel(オプション引数)(list)の形をとり、list内のdelayedで生成されたtupleについて並列処理を実行します。
つまり上記関数で実行している処理は

  1. delayedParallelに与えるtupleを生成
  2. 内包表記で上記tuplelistを生成
  3. 与えられたオプション引数、listを元に並列処理を実行

というフローになっています。
そしてオプション引数ですが、n_jobsは並列処理を実行するコア数、verboseはログを出力する頻度(10で最大,52以上で結果も出力)となっています。
並列数はもっとたくさん振ってもいいのですが、あまり早くリクエストを送りすぎると取得制限がかかってしまうため、控えめに設定してあります。
次にuse_joblib_insertの引数の説明をします。
startidは一番最初に取得したい試合のmatch_id,upto_numberstartidから何試合分までを取得するか、という内容になっています。

あるmatch_idから1000万件先まで取得したい場合は

use_joblib_insert(some_match_id,10000000)

といった形で実行します。

def player_splitter(some_match_id):
    try:
        #データ分割の処理
    except:
        print("failed to split data")

def role_hantei(some_match_id):
    try:
        #データ分割の処理
    except:
        print("failed to split data")

データ分割、role判定も同じように関数化した後joblibで並列処理したいのですが、先ほどと実装が異なる点があります。
今回の場合、引数として与えるmatch_idは「データレイクに入れた試合データのmatch_id」であるはずです。
なので、引数として与えたいmatch_idをデータレイクから入手する必要があります。

import pandas as pd
import numpy as np

ここでPythonのライブラリ、pandasnumpyをimportします。
pandasはデータ操作や処理を、numpyは数値計算を行うライブラリです。

conn = mysql.connector.connect(user=USER, password=PASSWORD, host=HOST,database=DATABASE)
get_match_id_sql = ("SELECT `match_id` from `DATALAKE`")

match_df = pd.read_sql(get_match_id_sql,conn)

pandas用にmysql.connectorの接続オブジェクトを生成します。今回はキーワード引数にdatabaseを与えてあげています。
クエリ文の内容は、データレイクからmatch_idだけを全取得するというものです。
pd.read_sql(クエリ、接続オブジェクト)を使用することで、クエリの結果が入ったpandasのデータフレームオブジェクトが生成されます。
データフレームというのは行列データが入ったオブジェクトで、数々のメソッドを使用してデータ操作が容易に行えます。
今回の場合は列はmatch_idのみの1列、行はデータレイクに入れたレコード数だけの行数のデータとなっています。

match_df_arr = np.array(match_df['matchid'])
match_df_arr = map(int,match_df_arr)

np.array()は引数にオブジェクトを与えることでNumPy配列を生成できます。
データフレームからlistを生成することもできますが、サイズの大きい配列だとNumPy配列の方が処理が高速なのかなと考え、このような形にしてます。(未検証)
MySQLに入れられる型にするため、map関数でint型に変換しています。

Parallel(n_jobs=8,verbose=10)([delayed(player_splitter)(some_match_id) for some_match_id in match_df_arr])

あとはfor文を回すオブジェクトに先ほど生成したmatch_df_arrに指定することで処理を行います。
今回はリクエスト制限などは無いので、並列数は8に設定しています。
role判定に関しても同様に実装できますので、詳細は割愛します。

以上で並列処理による大量のデータ収集及び加工が実装できました。

##今後
これまでの手順でDota2用の分析のデータを集めることができました。
今後これを用いてデータ集計などによる分析を行う予定です。
またランダムフォレストによる特徴量決定、word2vecによるheroの類似度、あるいはアンチピックモデルの構築を考えています。

そして今回はローカルにデータを集約しましたが、スケールアウトの容易性やアクセシビリティなどを考えると、クラウド上にデータを集約したほうが良さそうです。
パッチ毎にデータを収集する必要性があることから、ワークフローを組んでデータを収集の自動化なども行いたいです。
なので今後のプランとして

  • Luigiなどのフレームワークを利用したワークフローの作成
  • データレイクをCloud Firestore,データウェアハウスをBigQueryなどを利用して構築

を実装していきたいと思います。

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?