LoginSignup
7
5

More than 3 years have passed since last update.

[GCP]CloudStorageのファイル作成トリガでDataflowを起動して変換したかった

Last updated at Posted at 2018-04-21

GCPはほぼ初心者です。
どのような連携になるのかイメージをつかみたかったので簡単なものを作ってみました。

結果から言いますと、完成していません

GoogleのAPI呼び出しで権限エラーが発生しており、その原因が特定できていません。いろんな理由により別の方法をとることになったため中途半端なのですが、やったこと自体は残しておきたいと思います。

やりたいこと

CloudStrageにファイルをアップロードすると、即座にDataflowが動いて変換処理を行い、別バゲットへファイルを吐き出す

準備

  • Google Cloud Storageバゲットを2つ作成
    • test-in-raw 生データを格納
    • test-in-format 整形済のデータを格納

Dataflowの作成

まずはチュートリアル

テキスト中の文字をカウントするWordCountというGCP公式サンプルです。
https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven?hl=ja

「Cloud Dataflow サービス上でサンプル パイプラインを実行する」のほうがGCPのコンソール上で実施できるため、環境構築が不要となりオススメです。

少しわかりにくかった部分を解説すると
* INPUTは公開設定されているCloudStorage上のファイルですので準備する必要はありません
* stagingLocationとはJobの実行ファイルを吐き出す場所のようです

Jobの一覧画面
スクリーンショット 2018-04-14 17.14.47.png

Jobの詳細画面
グラフを見ると数秒で終わっているのに、Jobが3分半かかっているのが謎でしたがコードのコンパイルや、実行グラフの作成などを都度行なっているのでこうなっているのかなと考えてます。
スクリーンショット 2018-04-14 17.15.38.png

テンプレート作成

手動実行は単発のJob実行を想定しているようで、スケジュール実行など何度も実行するようなJobはテンプレート化したほうがよいようです
https://cloud.google.com/dataflow/docs/templates/overview?hl=ja

従来の Cloud Dataflow のデプロイと比較すると、テンプレートには次のようなメリットがあります。

  • パイプラインの実行時にコードを毎回再コンパイルする必要がありません。
  • 従来のデプロイでは一般的な、開発環境や関連する依存関係なしでパイプラインを実行できます。これは、繰り返し実行するバッチジョブをスケジュールする際に便利です。
  • ランタイムのパラメータによって実行をカスタマイズできます。
  • 技術者以外のユーザーでも、Google Cloud Platform Console、gcloud コマンドライン ツール、または REST API を使用してテンプレートを実行できます。

テンプレートサンプルもgoogleで用意されていますが、今回はチュートリアルに沿って自分で作成してみようと思います。
https://cloud.google.com/dataflow/docs/templates/creating-templates?hl=ja

mvn archetype:generate \
      -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
      -DarchetypeGroupId=com.google.cloud.dataflow \
      -DarchetypeVersion=2.2.0 \
      -DgroupId=com.nyasba.gcp \
      -DartifactId=dataflow-template \
      -Dversion="0.1" \
      -DinteractiveMode=false \
      -Dpackage=com.nyasba.gcp

これをIDEに取り込み、コードを修正します。

  • WordCount.javaのOptionをValueProviderに変更
  • WindowedWordCountの方もValueProviderに変更していく
  • WriteOneFilePerWindow内でFileBasedSink.convertToFileResourceIfPossible(filenamePrefix)はget()で値を取得

GCPのSDKをインストールしてログインした状態で、以下のコマンドを実行するとtemplateがアップロードされます。

mvn compile exec:java \
    -Dexec.mainClass=com.nyasba.gcp.WordCount \
    -Dexec.args="--runner=DataflowRunner \
                  --project=[project_name] \
                  --stagingLocation=gs://test-in-format/staging \
                  --output=gs://test-in-format/output \
                  --templateLocation=gs://test-in-format/templates/wordcount-template"

templateが作成されたというメッセージは出ますが、よくわからない点が2つ。
ただ、テンプレートはできてるので前に進みます

(1) ValueProviderからgetした事によるエラー

ResourceIdをValueProviderのまま変換してないので発生している模様。

4 15, 2018 2:23:51 午前 org.apache.beam.runners.dataflow.internal.CustomSources serializeToCloudSource
警告: Size estimation of the source failed: RuntimeValueProvider{propertyName=inputFile, default=gs://apache-beam-samples/shakespeare/kinglear.txt}
java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=inputFile, default=gs://apache-beam-samples/shakespeare/kinglear.txt}
    at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:235)

(2) テンプレート作成後にヌルポ

4 15, 2018 2:23:53 午前 org.apache.beam.runners.dataflow.DataflowRunner run
情報: Template successfully created.
[WARNING]
java.lang.NullPointerException
    at org.apache.beam.runners.dataflow.DataflowPipelineJob.getJobWithRetries (DataflowPipelineJob.java:503)

テンプレートメタデータ作成

テンプレートのINPUTを定義するメタデータファイルをwordcount-template_metadataというファイル名で作成して、templateと同じフォルダに配置しておきます

{
  "name": "WordCountCustom",
  "description": "hogehoge",
  "parameters": [{
    "name": "inputFile",
    "label": "Input Cloud Storage File(s)",
    "help_text": "Path of the file pattern glob to read from.",
    "regexes": ["^gs:\/\/[^\n\r]+$"],
    "is_optional": true
  },
  {
    "name": "output",
    "label": "Output Cloud Storage File Prefix",
    "help_text": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
    "regexes": ["^gs:\/\/[^\n\r]+$"]
  }]
}

テンプレートからの実行

コンソールでもできますが、CLIを使って実行します。inputFileは省略してoutputのみ指定。

gcloud beta dataflow jobs run template-job2 \
        --gcs-location gs://test-in-format/templates/wordcount-template \
        --parameters output=gs://test-in-format/output
createTime: '2018-04-14T17:45:09.108777Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: 2018-04-14_10_45_08-1360422684909966698
location: us-central1
name: template-job2
projectId: ...
type: JOB_TYPE_BATCH

無事成功です!

ただ、実行時間があまり変わってないですね・・。やはり都度実行グラフを解析しているところが影響しているようです。
ValueProviderで外から与えられたパラメータにより実行グラフが変わることがあるからなのでしょうか・・

スクリーンショット 2018-04-15 2.48.52.png

Dataflowを起動するFunctionsの作成

CloudStorageのファイルトリガーで動くFunctionを作成し、その中でDataflowのJobを起動する形にします。

Functionsに対して、AWSのロールのようなものがあればよかったのになぁ・・と思いつつ、試行錯誤してtokenを取得してUNAUTHORIZEDのエラーが出るところまでは進みました(service accountで取得したjsonはDLしてコードと一緒にアップロードする必要があります)

あとは権限エラーの原因をデバックして行きましたが、一番緩いオーナー権限でもUNAUTHORIZEDは解決せず。。原因不明のままお蔵入りです。

コードものせておきます。

/**
 * Background Cloud Function to be triggered by Cloud Storage.
 *
 * @param {object} event The Cloud Functions event.
 * @param {function} callback The callback function.
 */
exports.processFile = (event, callback) => {
    const file = event.data;

    if (file.resourceState === 'not_exists') {
      console.log(`File ${file.name} deleted.`);
    } else if (file.metageneration === '1') {
      // metageneration attribute is updated on metadata changes.
      // on create value is 1
      console.log(`File ${file.name} uploaded.`);

      // 本来はパスを渡す必要があるが、認証でずっと失敗してたのでそこまでたどり着かず。
      executeDataflow();
    } else {
      console.log(`File ${file.name} metadata updated.`);
    }

    callback();
};

const auth = require('google-auth-library');
const key = require('./service_account_key.json')
const rp = require('request-promise');

function executeDataflow(){
  const client = new auth.JWT(
    key.client_email,
    null,
    key.private_key,
    ['https://www.googleapis.com/auth/cloud-platform']
  ).authorize()
  .then(function (credentials) {
    console.log(`token: ${credentials}`);
    callApi(credentials.access_token);
  }).catch(function (err) {
    console.log(err);
  });
}

function callApi(token){
  var options = {
    method: 'POST',
    uri: 'https://dataflow.googleapis.com/v1b3/projects/[project_name]/templates:launch?gcsPath=gs://test-in-format/templates/wordcount-template',
    headers : {
      "Authorization" : "Bearer " + token
    },
    body: {
      "jobName": "template-job-from-api",
      "parameters": {
        "inputFile" : "gs://test-in-raw/my_input.txt",
        "output": "gs://test-in-format/output"
      }
    },
    json: true // Automatically stringifies the body to JSON
  };

  rp(options)
    .then(function (parsedBody) {
      console.log(parsedBody);
    })
    .catch(function (err) {
      console.log(err);
    });
}

無念。

7
5
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
5