1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AthenaからGoogle スプレッドシートへの連携

Posted at

背景

デジタル広告事業でデータレイクとかデータマートみたいなことを引き続きやってます
もともとTableau Serverで可視化したくて作ったのですが、データレイクからのデータマートみたいなものはあるので、可視化の先は何でも良い訳です。

人も増え、要件によっては、スプレッドシートに出力したい、ということが増えました。

やったこと

  • Atehna向けのSQLを置いておくと日次でスプレッドシートが更新されていくやつ
  • SELECTされる1列目はシート名というルールにし、group by 単位でシート作成
  • SELECTした列の数だけ良い感じにシートに貼り付ける
  • 数値は数値型で、文字列は文字列型でシートに貼り付ける

SQLさえ書いてしまえば、それ以降の運用はエンジニアから手が離れる、というのは良い点かと思います。システム変更依頼ではなくデータ抽出依頼にする ことで、工数も心理的ハードルも下げられるのではないかと思ってます。
また、仕組上、既に運用で利用しているシートなどにそのまま乗っけられることもポイントかと思います。

INPUT

・スプレッドシートのシートのID
・SQLファイル

シートのIDは、URLから取れるやつです。
実際にはスプレッドシートのURLを貰って、こちらでシートのIDを抽出して、連携用に発行したサービスアカウントIDの編集権限を付けてから実施してます。

code

ShellScriptでAthenaの結果を取得し、その結果ファイルをPythonを通してスプレッドシートに反映させる感じにします。

抜粋.sh
# INPUT:QUERY=SQLのファイルがあるパス
# INPUT:SHEETID=スプレッドシートのID
TIMEOUT=20
INTERVAL=5
QUERYSTR=`cat $QUERY`
RES=`aws athena start-query-execution --query-string "${QUERYSTR}"  --result-configuration OutputLocation=s3://xxxxxxxxxx/xxxxxxx/xxxxxxx/`

for i in `seq 1 ${TIMEOUT}`
do
  if [ `aws athena get-query-execution --query-execution-id ${RES}|grep SUCCEEDED|wc -l` -ne 0 ];then
    break
  else

    if [ `aws athena get-query-execution --query-execution-id ${RES}|grep FAILED|wc -l` -ne 0 ];then
      aws athena get-query-execution --query-execution-id ${RES}|grep FAILED
      exit 1
    fi
    sleep ${INTERVAL}
  fi
done
S3PATH=`aws athena get-query-execution --query-execution-id ${RES} |grep RESULTCONFIGURATION|awk '{print $2}'`

OUTPUTFILEPATH=./output/${SHEETID}.csv
/usr/bin/aws s3 cp ${S3PATH} ${OUTPUTFILEPATH}
/usr/bin/python3 push_data.py  ${SHEETID}  ${OUTPUTFILEPATH}

日次処理を前提としているので、sleepのインターバルは適当です。

以下、Pythonのコードです。
色々ご容赦ください(特に、カラム数に対して横を決めるの、コードの可読性も含めた上手い方法ってあるんでしょうか。。。)

※既存のシートは消さず更新だけするため、SQLの内容次第ではSQLの結果=スプレッドシートの内容とならず不整合が起きるのでご注意ください。

push_data.py
import sys
import csv
import datetime
import gspread
from oauth2client.service_account import ServiceAccountCredentials

args = sys.argv

scope = ['https://spreadsheets.google.com/feeds',
         'https://www.googleapis.com/auth/drive']

TARGET_SHEET_KEY=args[1]
DATACSV=args[2]
credentials = ServiceAccountCredentials.from_json_keyfile_name('service.json', scope)
gc = gspread.authorize(credentials)
wkb = gc.open_by_key(TARGET_SHEET_KEY)
sheetname_set= set()
for sheet in wkb.worksheets():
    sheetname_set.add(sheet.title)

sheetnameset=set()
pre_sheetname="dummy"
idx=0
rowcount=0;
linecount=0;
rowalphabet={1 : 'A',2 : 'B',3 : 'C',4 : 'D',5 : 'E',6 : 'F',7 : 'G',8 : 'H',9 : 'I',10 : 'J',11 : 'K',12 : 'L',13 : 'M',14 : 'N',15 : 'O',16 : 'P',17 : 'Q',18 : 'R',19 : 'S',20 : 'T',21 : 'U',22 : 'V',23 : 'W',24 : 'X',25 : 'Y',26 : 'Z',27 : 'AA',28 : 'AB',29 : 'AC',30 : 'AD',31 : 'AE',32 : 'AF',33 : 'AG',34 : 'AH',35 : 'AI',36 : 'AJ',37 : 'AK',38 : 'AL',39 : 'AM',40 : 'AN',41 : 'AO',42 : 'AP',43 : 'AQ',44 : 'AR',45 : 'AS',46 : 'AT',47 : 'AU',48 : 'AV',49 : 'AW',50 : 'AX',51 : 'AY',52 : 'AZ'}
header=[]
isHeader=True;
with open(DATACSV, 'r', encoding='utf8') as f_in:
        reader = csv.reader(f_in)
        for line in reader:
            linecount+=1
            for i in range(len(line)):
                # 1列目はシート名が入っている必要がある
                sheetname = line[0]
                # 1行目はヘッダーが入っている必要がある
                if linecount == 1:
                    header.append(line[i])
                    continue
                if sheetname != pre_sheetname :
                    if pre_sheetname != "dummy" :
                        wks.update_cells(cell_list)
                    if not sheetname in sheetname_set :
                        wkb.add_worksheet(title=sheetname, rows=5000, cols=30)
                        sheetname_set.add(sheetname)
                    wks = wkb.worksheet(sheetname)
                    srange ='A1:'+rowalphabet[len(header)]+'3000'
                    cell_list = wks.range(srange)
                    idx=0
                    for f in header:
                        cell_list[idx].value = f
                        idx+=1

                sheetnameset.add(sheetname)
                c_val = line[i]

                try :
                    c_val = float(c_val)
                except :
                    try:
                        c_val = int(c_val)
                    except :
                        pass
                cell_list[idx].value = c_val #1列目はsheetname
                idx+=1
                pre_sheetname = sheetname
wks.update_cells(cell_list)

感想

いろいろ楽になりました。
ここから更にGoogleデータポータルで可視化、というのも出来ると思うので夢は広がります。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?