LoginSignup
2
0

More than 1 year has passed since last update.

GASからAmazon Athena APIでデータを取得する

Posted at

ゴール

  • API呼び出し部分は Standalone Script で実装する
  • 外部ライブラリは使用しない
  • スプレッドシート等の Container Bound Script からライブラリとして読み込んで使用する
  • スクリプトエディタ上で引数や戻り値の入力補完ができるようにする

Amazon Athena API

APIの一覧はここにあります。
AWS SDK が使える環境であればその方が早いのですが、GASでは利用できないのでHTTPで直接APIを叩く必要があります。
生のリクエストは以下のようになります。

POST https://athena.ap-northeast-1.amazonaws.com/ HTTP/1.1
Host: athena.ap-northeast-1.amazonaws.com
Authorization: AWS4-HMAC-SHA256 Credential=XXXXXXXXXX/20220101/ap-northeast-1/athena/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-target, Signature=XXXXXXXXXX
Content-Type: application/x-amz-json-1.1
Content-Length: 30
X-Amz-Content-Sha256: XXXXXXXXXX
X-Amz-Date: 20220101T090000Z
X-Amz-Target: AmazonAthena.StartQueryExecution

{"QueryString":"SELECT NOW()"}

HTTPメソッドはPOST固定
リージョンは ap-northeast-1 = アジアパシフィック(東京)
Actionは X-Amz-Target ヘッダで指定します

AWSリクエストへの署名(署名バージョン4)

AWSのAPIへリクエストを送信する際に、署名を計算して Authorization ヘッダに含める必要があります。
署名バージョン 4 の署名プロセス に従い、署名を計算します。

sigv4.gs
/**
 * @typedef {Object} Request
 * @property {string} method
 * @property {string} path
 * @property {string} [queryString]
 * @property {Object.<string, string>} headers
 * @property {string} body
 * @property {string} region
 * @property {string} service
 * 
 * @typedef {Object} Credentials
 * @property {string} accessKeyId
 * @property {string} secretAccessKey
 */

/**
 * @see https://docs.aws.amazon.com/ja_jp/general/latest/gr/signature-version-4.html
 */
class SigV4 {
  /**
   * @param {Request} request
   * @param {Credentials} credentials
   * @param {Date} date
   * @return {Request} signed request
   */
  static sign(request, credentials, date) {
    delete request.headers['Authorization'];

    // TASK 1
    const method = request.method.toUpperCase();
    const canonicalUri = encodeURI(encodeURI(request.path));
    const canonicalQueryString = request.queryString ?? ''; // 使用する場合は正規化が必要
    const canonicalHeaders = Object.entries(request.headers)
      .sort((a, b) => a[0].toLowerCase().localeCompare(b[0].toLowerCase()))
      .reduce((previous, current) => `${previous}${current[0].toLowerCase()}:${Util.trimall(current[1])}\n`, '');
    const signedHeaders =  Object.keys(request.headers)
      .map(key => key.toLowerCase())
      .sort()
      .join(';');
    const hashedPayload = Util.hexFrom(Util.sha256(request.body));
    const canonicalRequest = `${method}\n${canonicalUri}\n${canonicalQueryString}\n${canonicalHeaders}\n${signedHeaders}\n${hashedPayload}`;
    const hashedCanonicalRequest = Util.hexFrom(Util.sha256(canonicalRequest));

    // TASK 2
    const algorithm = 'AWS4-HMAC-SHA256';
    const requestDateTime = Util.amzTimestampFrom(date);
    const credentialScope = `${Util.amzDateFrom(date)}/${request.region}/${request.service}/aws4_request`;
    const stringToSign = `${algorithm}\n${requestDateTime}\n${credentialScope}\n${hashedCanonicalRequest}`;

    // TASK 3
    const kSecret = credentials.secretAccessKey;
    const kDate = Util.hmacSha256(Util.amzDateFrom(date), `AWS4${kSecret}`);
    const kRegion = Util.hmacSha256(request.region, kDate);
    const kService = Util.hmacSha256(request.service, kRegion);
    const kSigning = Util.hmacSha256('aws4_request', kService);
    const signature = Util.hexFrom(Util.hmacSha256(stringToSign, kSigning));

    // TASK 4
    request.headers['Authorization']
      = `${algorithm} Credential=${credentials.accessKeyId}/${credentialScope}, SignedHeaders=${signedHeaders}, Signature=${signature}`;

    return request;
  }
}

日付やハッシュ計算などはUtilクラスにまとめています。

日付形式

AWSで扱う日付及びタイムスタンプは以下の形式です。

日付:yyyyMMdd (UTC)
タイムスタンプ:yyyyMMddTHHmmssZ (UTC)

util.gs
class Util {
  /**
   * @param {Date} date
   * @return {string} AmzDate:yyyyMMdd (UTC)
   */
  static amzDateFrom(date) {
    return String(date.getUTCFullYear()) +
      String(date.getUTCMonth() + 1).padStart(2,'0') +
      String(date.getUTCDate()).padStart(2,'0');
  }

  /**
   * @param {Date} date
   * @return {string} AmzTimestamp:yyyyMMddTHHmmssZ
   */
  static amzTimestampFrom(date) {
    return String(date.getUTCFullYear()) +
      String(date.getUTCMonth() + 1).padStart(2,'0') +
      String(date.getUTCDate()).padStart(2,'0') +
      'T' +
      String(date.getUTCHours()).padStart(2,'0') +
      String(date.getUTCMinutes()).padStart(2,'0') +
      String(date.getUTCSeconds()).padStart(2,'0') +
      'Z';
  }
...

ハッシュ計算

GAS組み込みの Utilities に実装されているのでそれを利用します。

sha256

util.gs
...
  /**
   * @param {Byte[]} bytes
   * @return {string} hex string
   */
  static hexFrom(bytes) {
    return bytes.map(b => (b & 0xFF).toString(16).padStart(2,'0')).join('');
  }

  /**
   * @param {string} value
   * @return {Byte[]} digest
   */
  static sha256(value) {
    return Utilities.computeDigest(Utilities.DigestAlgorithm.SHA_256, value, Utilities.Charset.UTF_8);
  }
...

hmac sha256

util.gs
...
  /**
   * @param {string} value
   * @return {Byte[]} bytes
   */
  static bytesFrom(value) {
    return Utilities.base64Decode(Utilities.base64Encode(value, Utilities.Charset.UTF_8), Utilities.Charset.UTF_8);
  }

  /**
   * @param {string | Byte[]} value
   * @param {string | Byte[]} key
   * @return {Byte[]} signature
   */
  static hmacSha256(value, key) {
    if (typeof value === 'string') value = this.bytesFrom(value);
    if (typeof key === 'string') key = this.bytesFrom(key);
    return Utilities.computeHmacSha256Signature(value, key);
  }
...

API呼び出し

API呼び出し部分を実装します。
ライブラリとして読み込んだ場合に公開されるのは、グローバルスコープ(基本的にはトップレベルで定義された var 及び function)のみなので、 Container Bound Script から呼ばれる関数はそこに定義します。
また、GASの仕様で末尾に_の付く関数は公開されないため、privateな関数はそれで定義します。

athena.gs
// static params
const apiVersion = '2017-05-18';
const domain = 'amazonaws.com';
const method = 'post';
const protocol = 'https';
const service = 'athena';
const targetPrefix = 'AmazonAthena';
// configurations
let _options = {};

/**
 * @param {ClientConfiguration} options
 */
function setConfiguration(options) {
  _options = options;
}

/**
 * @param {GetQueryExecutionInput} params
 * @return {GetQueryExecutionOutput} result
 */
function getQueryExecution(params) {
  return callApi_('GetQueryExecution', params);
}

/**
 * @param {GetQueryResultsInput} params
 * @return {GetQueryResultsOutput} result
 */
function getQueryResults(params) {
  return callApi_('GetQueryResults', params);
}

/**
 * @param {StartQueryExecutionInput} params
 * @return {StartQueryExecutionOutput} result
 */
function startQueryExecution(params) {
  params.ClientRequestToken = params.ClientRequestToken ?? Utilities.getUuid(); // 冪等性を担保するためのトークン
  return callApi_('StartQueryExecution', params);
}

/**
 * @param {StopQueryExecutionInput} params
 * @return {StopQueryExecutionOutput} result
 */
function stopQueryExecution(params) {
  return callApi_('StopQueryExecution', params);
}

/**
 * @param {string} action
 * @param {Object} params
 * @return {Object} content
 */
function callApi_(action, params) {
  if (!_options.credentials) {
    throw 'Credentials is not defined.';
  }
  if (!_options.region) {
    throw 'Region is not defined.';
  }

  const now = new Date();
  const host = `${service}.${_options.region}.${domain}`;
  const payload = JSON.stringify(params);
  const headers = {
    'Content-Type': 'application/x-amz-json-1.1',
    'Host': host,
    'X-Amz-Content-Sha256': Util.hexFrom(Util.sha256(payload)),
    'X-Amz-Date': Util.amzTimestampFrom(now),
    'X-Amz-Target': `${targetPrefix}.${action}`,
  };
  const request = SigV4.sign({
    method: method,
    path: '/',
    headers: headers,
    body: payload,
    region: _options.region,
    service: service,
  }, _options.credentials, now);

  const endpoint = `${protocol}://${host}/`;
  delete request.headers['Host']; // HostヘッダーはUrlFetchAppで自動設定される
  const fetchParams = {
    method: request.method, headers: request.headers, payload: request.body,
  };
  const response = UrlFetchApp.fetch(endpoint, fetchParams);
  return JSON.parse(response.getContentText());
}

トップレベルのfunctionを使わず、クラスを定義する方法もありますが、クラス内で定義された関数は後述するJSDocによる補完が出来なかったため採用しませんでした。
また、SQLの発行に必要なAPIに絞って実装しているため、Amazon Athena API のすべてを網羅していません。

入力補完について

JSDoc を書くことで、スクリプトエディタ上で入力補完を受けることができます。しかし、GAS上では限定的にしか実装されていません。
特にライブラリでは@param@returnのみ使用でき、@typedefで型を定義することが出来ず、@param {string} obj.valueのような書き方でオブジェクトを表現することも出来ません。

上記の仕様のため、ライブラリ側だけで完結する事は出来ませんでした。
しかし、同一プロジェクト内に記載されたJSDocでは@typedefを利用することが出来ます。
そのため、型定義を別のファイルに切り出し、 Container Bound Script 側に同じ内容のファイルを作成することで、読み込んだライブラリの関数に定義された型を入力補完させることにしました。

athena.typedef.gs
/**
 * Arguments Types
 * 
 * @typedef {Object} CredentialsOptions
 * @property {string} accessKeyId
 * @property {string} secretAccessKey
 * 
 * @typedef {Object} ClientConfiguration
 * @property {CredentialsOptions} credentials
 * @property {string} region
 * 
 * Data Types
 * @see https://docs.aws.amazon.com/ja_jp/athena/latest/APIReference/API_Types.html
 * 
 * @typedef {Object} ColumnInfo
 * @property {string} Name
 * @property {string} Type
 * @property {boolean} [CaseSensitive]
 * @property {string} [CatalogName]
 * @property {string} [Label]
 * @property {string} [Nullable] 'NOT_NULL' | 'NULLABLE' | 'UNKNOWN'
 * @property {number} [Precision]
 * @property {number} [Scale]
 * @property {string} [SchemaName]
 * @property {string} [TableName]
 * 
 * @typedef {Object} Datum
 * @property {string} [VarCharValue]
 * 
 * @typedef {Object} EncryptionConfiguration
 * @property {string} EncryptionOption 'SSE_S3' | 'SSE_KMS' | 'CSE_KMS'
 * @property {string} [KmsKey]
 * 
 * @typedef {Object} EngineVersion
 * @property {string} [EffectiveEngineVersion]
 * @property {string} [SelectedEngineVersion]
 * 
 * @typedef {Object} QueryExecution
 * @property {EngineVersion} [EngineVersion]
 * @property {string} [Query]
 * @property {QueryExecutionContext} [QueryExecutionContext]
 * @property {string} [QueryExecutionId]
 * @property {ResultConfiguration} [ResultConfiguration]
 * @property {string} [StatementType] 'DDL' | 'DML' | 'UTILITY'
 * @property {QueryExecutionStatistics} [Statistics]
 * @property {QueryExecutionStatus} [Status]
 * @property {string} [WorkGroup]
 * 
 * @typedef {Object} QueryExecutionContext
 * @property {string} [Catalog]
 * @property {string} [Database]
 * 
 * @typedef {Object} QueryExecutionStatistics
 * @property {string} [DataManifestLocation]
 * @property {number} [DataScannedInBytes]
 * @property {number} [EngineExecutionTimeInMillis]
 * @property {number} [QueryPlanningTimeInMillis]
 * @property {number} [QueryQueueTimeInMillis]
 * @property {number} [ServiceProcessingTimeInMillis]
 * @property {number} [TotalExecutionTimeInMillis]
 * 
 * @typedef {Object} QueryExecutionStatus
 * @property {string} [CompletionDateTime]
 * @property {string} [State] 'QUEUED' | 'RUNNING' | 'SUCCEEDED' | 'FAILED' | 'CANCELLED'
 * @property {string} [StateChangeReason]
 * @property {string} [SubmissionDateTime]
 * 
 * @typedef {Object} ResultConfiguration
 * @property {EncryptionConfiguration} [EncryptionConfiguration]
 * @property {string} [OutputLocation]
 * 
 * @typedef {Object} ResultSet
 * @property {ResultSetMetadata} [ResultSetMetadata]
 * @property {Row[]} [Rows]
 * 
 * @typedef {Object} ResultSetMetadata
 * @property {ColumnInfo[]} [ColumnInfo]
 * 
 * @typedef {Object} Row
 * @property {Datum[]} [Contents]
 * 
 * API Operations Types
 * @see https://docs.aws.amazon.com/ja_jp/athena/latest/APIReference/API_Operations.html
 * 
 * @typedef {Object} GetQueryExecutionInput
 * @property {string} QueryExecutionId
 * 
 * @typedef {Object} GetQueryExecutionOutput
 * @property {QueryExecution} [QueryExecution]
 * 
 * @typedef {Object} GetQueryResultsInput
 * @property {string} QueryExecutionId
 * @property {number} [MaxResults]
 * @property {string} [NextToken]
 * 
 * @typedef {Object} GetQueryResultsOutput
 * @property {string} [NextToken]
 * @property {ResultSet} [ResultSet]
 * @property {number} [UpdateCount]
 * 
 * @typedef {Object} StartQueryExecutionInput
 * @property {string} QueryString
 * @property {string} [ClientRequestToken]
 * @property {QueryExecutionContext} [QueryExecutionContext]
 * @property {ResultConfiguration} [ResultConfiguration]
 * @property {string} [WorkGroup]
 * 
 * @typedef {Object} StartQueryExecutionOutput
 * @property {string} [QueryExecutionId]
 * 
 * @typedef {Object} StopQueryExecutionInput
 * @property {string} [QueryExecutionId]
 * 
 * @typedef {Object} StopQueryExecutionOutput
 * @property {string} [nothing]
 * 
 */

ライブラリのデプロイと読み込み

スクリプトエディタの右上にある[デプロイ]ボタンから、種類にライブラリを選択してデプロイ出来ます。
デプロイするとバージョンが付与され、読み込み側でバージョン指定を変更するまで内容が変わらず利用できます。

開発中や個人利用でバージョン管理の必要が無い場合は、デプロイせずにライブラリとして読み込むことも出来ます。

スクリプトエディタのプロジェクトの設定から、スクリプトIDをコピーします。
image.png

利用側のスクリプトエディタでライブラリの追加から、コピーしたIDを張り付けて読み込みます。
※デプロイ時に表示されるデプロイIDではありません
image.png

image.png

使い方

Amazon Athena API でのクエリ実行は非同期になっているので、StartQueryExecution, GetQueryExecution, GetQueryResults を順に呼び出す必要があります。
呼び出し側でquery関数にまとめていますが、必要なパラメータを整理してライブラリ側で実装しても良いでしょう。

コード.gs
const AWS_ACCESS_KEY_ID = 'accesskeyid';
const AWS_SECRET_ACCESS_KEY = 'secretaccesskey';
const AWS_DEFAULT_REGION = 'ap-northeast-1';

function loadData() {
  AmazonAthena.setConfiguration({
    credentials: { accessKeyId: AWS_ACCESS_KEY_ID, secretAccessKey: AWS_SECRET_ACCESS_KEY },
    region: AWS_DEFAULT_REGION,
  });

  const sql = 'SELECT NOW()';

  const sheet = SpreadsheetApp.getActiveSpreadsheet().getSheetByName('シート1').clear();
  const setRows = (rows) => { sheet.getRange(sheet.getLastRow()+1, 1, rows.length, rows[0].length).setValues(rows) };
  query(sql, 500, setRows);
  console.log(`output ${sheet.getLastRow()} rows.`);
}

/**
 * StartQueryExecutionのAPIに渡すパラメータ(WorkGroup, Catalog, Database など)は全てデフォルトになります。
 * ワークグループにはクエリの結果の場所が設定されている必要があります。
 * 
 * @param {string} sql
 * @param {number} maxResults
 * @param {Function} callback
 */
function query(sql, maxResults, callback) {
  const query = AmazonAthena.startQueryExecution({ QueryString: sql });

  let queryExecution;
  while(true) {
    queryExecution = AmazonAthena.getQueryExecution({ QueryExecutionId: query.QueryExecutionId });
    if (['QUEUED', 'RUNNING'].includes(queryExecution.QueryExecution?.Status?.State)) {
      Utilities.sleep(500);
    }
    else {
      break;
    }
  }

  let queryResult;
  let nextToken = null;
  while(true) {
    queryResult = AmazonAthena.getQueryResults({
      QueryExecutionId: queryExecution.QueryExecution?.QueryExecutionId,
      MaxResults: maxResults,
      NextToken: nextToken,
    });
    const chunk = queryResult.ResultSet.Rows.map(row => row.Data.map(d => d.VarCharValue));
    if (chunk.length > 0) callback(chunk);
    nextToken = queryResult.NextToken;
    if (!nextToken) break;
  }
}
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0