Go+GAE+Task QueueでCSVファイルを読み込んでCloudDatastoreに記録 その1

  • 4
    いいね
  • 2
    コメント

今回はTaskQueueを使ってみます。
「その1」では認証すら通していないかなり簡単な使い方ですので、実践的な学習はその2で行う予定です。
(と言っても、実行時の認証・登録した情報の画面表示程度の予定)

今回やりたいこと

  1. TaskQueueを利用してみる
  2. HTTPでファイルを取得する
  3. CSVファイルのライブラリ利用と文字コード変換を行う
  4. 読み込んだデータをDataStoreに保存する
  5. DataStoreに保存するEntryで項目除外のタグとpriate要素を使ってみる

これだけです。

前提条件

過去の記事「WindowsのEclipseでGo+AppEngineの開発環境を構築」で構築した環境を利用しています。

事前準備

何でもいいのでCSVファイルをHTTPで取得できるサイトを準備 or 探してみます。
筆者は「csv サンプル」でググって一番上に出てくるサイトのファイルを利用させていただきました。

コーディング開始

entry.go

前回のように拘ったことはせずにシンプルな作りにしてあります。

5. DataStoreに保存するEntryで項目除外のタグとpriate要素を使ってみる

「やりたいことリスト」の上記を行うために以下2点を実施しました。

  • BunkEntryのtransferClientをprivate(小文字始まり)にする
  • BunkEntryのErrに"-"タグを付ける

うまくいけばこの2要素はDatastoreに保存されません。

entry.go
package taskqueue

import "strconv"

// BunkEntry Datastoreに格納する用の振り込み情報を保持したEntry
type BunkEntry struct {
    BankCode       int
    BranchCode     int
    Subject        int
    AccountNumber  int
    Name           string
    Price          int
    transferClient string
    Err            error `datastore:"-"`
}

func (e *BunkEntry) convertCSVtoBunkEntry(row []string) error {
    e.BankCode = e.strToInt(row[1])
    e.BranchCode = e.strToInt(row[2])
    e.Subject = e.strToInt(row[3])
    e.AccountNumber = e.strToInt(row[4])
    e.Name = row[5]
    e.Price = e.strToInt(row[6])
    e.transferClient = row[7]
    return e.Err
}

func (e *BunkEntry) strToInt(str string) int {
    if e.Err != nil {
        return 0
    }
    var intVal int
    intVal, e.Err = strconv.Atoi(str)
    return intVal
}

main.go

たいしたことは何もしていません。
が、http経由でファイルを取得する方法がすぐに分からず意外と苦労しました。
http.GetはGAEでは利用できませんのでご注意ください。

  1. http://localhost:8080/ で「ok」だけ表示され、
  2. http://localhost:8080/task でTaskQueueにpushされ、
  3. http://localhost:8080/job がキューとして実行される

それだけのシンプルな処理です。

main.go
package taskqueue

import (
    "bytes"
    "encoding/csv"
    "io"
    "io/ioutil"
    "net/http"
    "net/url"

    "golang.org/x/net/context"
    "golang.org/x/text/encoding/japanese"
    "golang.org/x/text/transform"
    "google.golang.org/appengine"
    "google.golang.org/appengine/datastore"
    "google.golang.org/appengine/taskqueue"
    "google.golang.org/appengine/urlfetch"
)

func init() {
    http.HandleFunc("/", indexHandler)
    http.HandleFunc("/task", taskHandler)
    http.HandleFunc("/job", jobHandler)
}

// index用のhandler。今回はokを表示するだけ。
func indexHandler(w http.ResponseWriter, r *http.Request) {
    respond(w, r, 200, "ok")
}

// taskqueueを実行するためだけのhandler。ノーガードなのでこのまま本番反映すると死にます
func taskHandler(w http.ResponseWriter, r *http.Request) {
    ctx := appengine.NewContext(r)

    //taskqueueにpush
    task := taskqueue.NewPOSTTask("/job", url.Values{})
    taskqueue.Add(ctx, task, "default")
}

// taskqueueとして実行されるhandler
func jobHandler(w http.ResponseWriter, r *http.Request) {

    url := "http://www.*****.com/****.csv"
    ctx := appengine.NewContext(r)
    client := urlfetch.Client(ctx)
    resp, err := client.Get(url)
    if err != nil {
        respond(w, r, http.StatusInternalServerError, err.Error())
        return
    }
    defer resp.Body.Close()

    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        respond(w, r, http.StatusInternalServerError, err.Error())
        return
    }

    // CSVファイルがShift-JISなのでUTF-8に変換して読込
    reader := csv.NewReader(transform.NewReader(bytes.NewReader(body), japanese.ShiftJIS.NewDecoder()))

    for {
        row, err := reader.Read()

        if err == io.EOF {
            break
        }
        if err != nil {
            // 読み込みエラー発生
            respond(w, r, http.StatusInternalServerError, err.Error())
            break
        }

        // 今回のサンプルファイルでは1列目がデータ判別用となっているため、1以外は破棄する
        if row[0] != "1" {
            continue
        }

        if err := csvRowImport(ctx, row); err != nil {
            respond(w, r, http.StatusInternalServerError, err.Error())
        }
    }
}

// csv1行分を処理
func csvRowImport(ctx context.Context, row []string) error {
    entry := &BunkEntry{}
    if err := entry.convertCSVtoBunkEntry(row); err != nil {
        return err
    }
    return addDatastore(ctx, entry)
}

// datastoreにentryを登録
func addDatastore(ctx context.Context, entry *BunkEntry) error {
    key := datastore.NewIncompleteKey(ctx, "Bunk", nil)
    _, err := datastore.Put(ctx, key, entry)
    return err
}

// かなり適当なレスポンス用メソッド
func respond(w http.ResponseWriter, r *http.Request, status int, body string) {
    w.Write([]byte(`
        <html>
            <head>  
                <title>task-queue sample</title>
            </head>
            <body>
                ` + body + `
            </body>
        </html>
    `))
}

実行

ローカル環境でのGAEエミュレータを起動

さすがにTaskQueueの実行がノーガードな状態でGAE上に上げる勇気はないので、またまたAppEngine SDKにて実行。

コンソールかEclipseから実行
goapp serve {eclipse_project}\src

{eclipse_project}は筆者の場合はC:\works\eclipse\workspace\gae-sample

実行時ログ
INFO     2016-11-11 01:48:09,427 devappserver2.py:769] Skipping SDK update check.
INFO     2016-11-11 01:48:09,661 api_server.py:205] Starting API server at: http://localhost:53634
INFO     2016-11-11 01:48:09,668 dispatcher.py:197] Starting module "default" running at: http://localhost:8080
INFO     2016-11-11 01:48:09,673 admin_server.py:116] Starting admin server at: http://localhost:8000
INFO     2016-11-11 01:49:43,549 module.py:788] default: "GET /index HTTP/1.1" 200 108
INFO     2016-11-11 01:49:46,552 module.py:788] default: "GET /task HTTP/1.1" 200 -
INFO     2016-11-11 01:49:46,684 module.py:788] default: "POST /job HTTP/1.1" 200 -

http://localhost:8080/task 実行後に自動で/jobが実行されました。
ではDatastoreの中身を見てみます。

Datastoreの状態

datastore_04.PNG

privateな「transferClient」はもちろん、publicな「Err」もタグのおかげで保存されていませんでした。成功!

~ TaskQueueについての学習が深まり次第「その2」を更新予定です ~