こんにちは。BaaS界のサンタクロースになりたいstakezakiです。
弊社のBaaSであるvte.cxとBigQueryとの連携でStreaming APIを使ったので、その辺りについて書きたいと思います。(vte.cxについては、こちらをご覧ください)
なぜBaaSとBigQueryを連携させようと思ったのか
vte.cxのデータストアはKVSなので大量のトランザクションを捌くことは得意なのですがデータ分析は苦手です。一方、BigQueryは分析は得意ですがトランザクション更新系は不得意です。
以下がvte.cxとBigQueryの連携のイメージなんですが、要はオンライントランザクション処理をvte.cxで行って、同時に履歴データをBigQueryに入れることで分析も行えたら素晴らしいんじゃないかと思ったわけです。
vte.cxとBigQueryの両方の得意なところを組み合わせて、オンライン処理+分析に使おうというのが目的です。
BigQueryにJSON形式で出し入れできることの意味
BigQueryは「DWHとしては決定版だと思います。これ以上のものはもう量子コンピュータを待たないと出来ないでしょう。-GCP愛を語る-」とありますが、私もそう思います。Hadoopベースの他のサービスなんてアホらしくて使ってられないと私は思うのですが不思議なことに使っている人はいるんですよねえ、なぜだろ。
さらに、これはBigQueryの優れた機能の一つなのですが、JSON形式でインポートすることができ、ネストされた構造や繰り返しの構造をサポートしています。
vte.cxへのデータ操作はJSONが基本です。この機能により、ユーザがWebアプリケーションにデータ登録するJSONと同じものをBigQueryにもインポートできるのです。
また、これまでアプリケーションのRDB等で貯めたデータを非同期に移すのではなく、ほぼリアルタイムにアプリケーションから直接インサートされるイメージになります。
しかも、BigQueryに投入したデータをアプリケーションからも参照することができます。
数TBのデータを数秒〜十数秒のレスポンスタイムで検索可能なわけですから、オンラインのWeb系でも十分に使えます。
つまり、vte.cx+BigQueryによって、オンライントランザクション機能がBigQueryにもたらされるのです。
もうこれは、Webアプリケーション バックエンドの鉄板ソリューションですね。
格納後にJSONで読めなくて苦労した
実は、JSONの格納は割と簡単にできたのですが、JSONの形で取り出すことが大変なことが後でわかったんです。入れたときと同じ形でJSONを取り出そうと思っても簡単にはいきません。
そこで、以下のようなクエリを組み立てることでなんとか解決しました。これはあるスキーマに対応するJSON出力用のクエリのサンプルです。すべてのネスト項目についてFLATTENで展開してからJSONを組み立てます。力技ですね。
(まあ、他にいい方法があれば誰か教えてください。あと、履歴の取り方にもよりますが莫大なデータ量になるとも思えないし、コストも心配ないでしょう、たぶん。)
これでも数秒〜十数秒しかかかりません。恐るべしBigQuery。
select flg, src.id as id, updated, rights____type, title, link.___rel, link.___href, link.___num, contributor.uri, contributor.name, contributor.___num, author.uri, author.name, author.___num, user_parent.___num, user_parent.parent_text, user_parent.user_child.___num, user_parent.user_child.child_text, user_pine.___num, user_pine.pine_text, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text, user_paintset.paintset_text, user_paintset.user_palette.palette_text, user_paintset.user_palette.palette_size, user_paintset.user_paintbrush.paintbrush_text, user_paintset.user_paintbrush.user_measure.length_text, user_paintset.user_paintbrush.user_measure.weight_text, user_paintset.user_colors.___num, user_paintset.user_colors.colors_text from (
select flg, ___key, ___revision, id, updated, rights____type, title, link.___rel, link.___href, link.___num, contributor.uri, contributor.name, contributor.___num, author.uri, author.name, author.___num, user_parent.___num, user_parent.parent_text, user_parent.user_child.___num, user_parent.user_child.child_text, user_pine.___num, user_pine.pine_text, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text, user_paintset.paintset_text, user_paintset.user_palette.palette_text, user_paintset.user_palette.palette_size, user_paintset.user_paintbrush.paintbrush_text, user_paintset.user_paintbrush.user_measure.length_text, user_paintset.user_paintbrush.user_measure.weight_text, user_paintset.user_colors.___num, user_paintset.user_colors.colors_text from
(select '0' as flg, ___key, ___revision, id, updated, rights____type, title, user_paintset.paintset_text, user_paintset.user_palette.palette_text, user_paintset.user_palette.palette_size, user_paintset.user_paintbrush.paintbrush_text, user_paintset.user_paintbrush.user_measure.length_text, user_paintset.user_paintbrush.user_measure.weight_text from [test_dataset.test_listnum_table]),
(FLATTEN((select 'link' as flg, id, link.___num, link.___rel, link.___href from [test_dataset.test_listnum_table]), link)),
(FLATTEN((select 'contributor' as flg, id, contributor.___num, contributor.uri, contributor.name from [test_dataset.test_listnum_table]), contributor)),
(FLATTEN((select 'author' as flg, id, author.___num, author.uri, author.name from [test_dataset.test_listnum_table]), author)),
(FLATTEN((select 'user_parent' as flg, id, user_parent.___num, user_parent.parent_text from [test_dataset.test_listnum_table]), user_parent)),
(FLATTEN((select 'user_parent.user_child' as flg, id, user_parent.___num, user_parent.user_child.___num, user_parent.user_child.child_text from
(FLATTEN((select id, user_parent.___num, user_parent.user_child.___num, user_parent.user_child.child_text from [test_dataset.test_listnum_table]), user_parent)
)
), user_parent.user_child)),
(FLATTEN((select 'user_pine' as flg, id, user_pine.___num, user_pine.pine_text from [test_dataset.test_listnum_table]), user_pine)),
(FLATTEN((select 'user_pine.user_bamboo' as flg, id, user_pine.___num, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text from
(FLATTEN((select id, user_pine.___num, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text from [test_dataset.test_listnum_table]), user_pine))
), user_pine.user_bamboo)),
(FLATTEN((select 'user_pine.user_bamboo.user_plum' as flg, id, user_pine.___num, user_pine.user_bamboo.___num, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text from
(FLATTEN((select id, user_pine.___num, user_pine.user_bamboo.___num, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text from
(FLATTEN((select id, user_pine.___num, user_pine.user_bamboo.___num, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text from [test_dataset.test_listnum_table]), user_pine))
), user_pine.user_bamboo))
), user_pine.user_bamboo.user_plum)),
(FLATTEN((select 'user_paintset.user_colors' as flg, id, user_paintset.user_colors.___num, user_paintset.user_colors.colors_text from [test_dataset.test_listnum_table]), user_paintset.user_colors)),
) src
JOIN EACH
(select id from
(FLATTEN((select ___key, ___revision, id, updated, rights____type, title, link.___rel, link.___href, link.___num, contributor.uri, contributor.name, contributor.___num, author.uri, author.name, author.___num, user_parent.___num, user_parent.parent_text, user_parent.user_child.___num, user_parent.user_child.child_text, user_pine.___num, user_pine.pine_text, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text, user_paintset.paintset_text, user_paintset.user_palette.palette_text, user_paintset.user_palette.palette_size, user_paintset.user_paintbrush.paintbrush_text, user_paintset.user_paintbrush.user_measure.length_text, user_paintset.user_paintbrush.user_measure.weight_text, user_paintset.user_colors.___num, user_paintset.user_colors.colors_text
from
(FLATTEN((select ___key, ___revision, id, updated, rights____type, title, link.___rel, link.___href, link.___num, contributor.uri, contributor.name, contributor.___num, author.uri, author.name, author.___num, user_parent.___num, user_parent.parent_text, user_parent.user_child.___num, user_parent.user_child.child_text, user_pine.___num, user_pine.pine_text, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text, user_paintset.paintset_text, user_paintset.user_palette.palette_text, user_paintset.user_palette.palette_size, user_paintset.user_paintbrush.paintbrush_text, user_paintset.user_paintbrush.user_measure.length_text, user_paintset.user_paintbrush.user_measure.weight_text, user_paintset.user_colors.___num, user_paintset.user_colors.colors_text from
(FLATTEN((select ___key, ___revision, id, updated, rights____type, title, link.___rel, link.___href, link.___num, contributor.uri, contributor.name, contributor.___num, author.uri, author.name, author.___num, user_parent.___num, user_parent.parent_text, user_parent.user_child.___num, user_parent.user_child.child_text, user_pine.___num, user_pine.pine_text, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text, user_paintset.paintset_text, user_paintset.user_palette.palette_text, user_paintset.user_palette.palette_size, user_paintset.user_paintbrush.paintbrush_text, user_paintset.user_paintbrush.user_measure.length_text, user_paintset.user_paintbrush.user_measure.weight_text, user_paintset.user_colors.___num, user_paintset.user_colors.colors_text from
(FLATTEN((select ___key, ___revision, id, updated, rights____type, title, link.___rel, link.___href, link.___num, contributor.uri, contributor.name, contributor.___num, author.uri, author.name, author.___num, user_parent.___num, user_parent.parent_text, user_parent.user_child.___num, user_parent.user_child.child_text, user_pine.___num, user_pine.pine_text, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text, user_paintset.paintset_text, user_paintset.user_palette.palette_text, user_paintset.user_palette.palette_size, user_paintset.user_paintbrush.paintbrush_text, user_paintset.user_paintbrush.user_measure.length_text, user_paintset.user_paintbrush.user_measure.weight_text, user_paintset.user_colors.___num, user_paintset.user_colors.colors_text from
(FLATTEN((select ___key, ___revision, id, updated, rights____type, title, link.___rel, link.___href, link.___num, contributor.uri, contributor.name, contributor.___num, author.uri, author.name, author.___num, user_parent.___num, user_parent.parent_text, user_parent.user_child.___num, user_parent.user_child.child_text, user_pine.___num, user_pine.pine_text, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text, user_paintset.paintset_text, user_paintset.user_palette.palette_text, user_paintset.user_palette.palette_size, user_paintset.user_paintbrush.paintbrush_text, user_paintset.user_paintbrush.user_measure.length_text, user_paintset.user_paintbrush.user_measure.weight_text, user_paintset.user_colors.___num, user_paintset.user_colors.colors_text from
(FLATTEN((select ___key, ___revision, id, updated, rights____type, title, link.___rel, link.___href, link.___num, contributor.uri, contributor.name, contributor.___num, author.uri, author.name, author.___num, user_parent.___num, user_parent.parent_text, user_parent.user_child.___num, user_parent.user_child.child_text, user_pine.___num, user_pine.pine_text, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text, user_paintset.paintset_text, user_paintset.user_palette.palette_text, user_paintset.user_palette.palette_size, user_paintset.user_paintbrush.paintbrush_text, user_paintset.user_paintbrush.user_measure.length_text, user_paintset.user_paintbrush.user_measure.weight_text, user_paintset.user_colors.___num, user_paintset.user_colors.colors_text from
(FLATTEN((select ___key, ___revision, id, updated, rights____type, title, link.___rel, link.___href, link.___num, contributor.uri, contributor.name, contributor.___num, author.uri, author.name, author.___num, user_parent.___num, user_parent.parent_text, user_parent.user_child.___num, user_parent.user_child.child_text, user_pine.___num, user_pine.pine_text, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text, user_paintset.paintset_text, user_paintset.user_palette.palette_text, user_paintset.user_palette.palette_size, user_paintset.user_paintbrush.paintbrush_text, user_paintset.user_paintbrush.user_measure.length_text, user_paintset.user_paintbrush.user_measure.weight_text, user_paintset.user_colors.___num, user_paintset.user_colors.colors_text from
(FLATTEN((select ___key, ___revision, id, updated, rights____type, title, link.___rel, link.___href, link.___num, contributor.uri, contributor.name, contributor.___num, author.uri, author.name, author.___num, user_parent.___num, user_parent.parent_text, user_parent.user_child.___num, user_parent.user_child.child_text, user_pine.___num, user_pine.pine_text, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text, user_paintset.paintset_text, user_paintset.user_palette.palette_text, user_paintset.user_palette.palette_size, user_paintset.user_paintbrush.paintbrush_text, user_paintset.user_paintbrush.user_measure.length_text, user_paintset.user_paintbrush.user_measure.weight_text, user_paintset.user_colors.___num, user_paintset.user_colors.colors_text from
(FLATTEN((select ___key, ___revision, id, updated, rights____type, title, link.___rel, link.___href, link.___num, contributor.uri, contributor.name, contributor.___num, author.uri, author.name, author.___num, user_parent.___num, user_parent.parent_text, user_parent.user_child.___num, user_parent.user_child.child_text, user_pine.___num, user_pine.pine_text, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text, user_paintset.paintset_text, user_paintset.user_palette.palette_text, user_paintset.user_palette.palette_size, user_paintset.user_paintbrush.paintbrush_text, user_paintset.user_paintbrush.user_measure.length_text, user_paintset.user_paintbrush.user_measure.weight_text, user_paintset.user_colors.___num, user_paintset.user_colors.colors_text from [test_dataset.test_listnum_table]),
user_pine))),
user_pine.user_bamboo))),
user_pine.user_bamboo.user_plum))),
user_paintset.user_colors))),
user_parent))),
user_parent.user_child))),
author))),
contributor))),
link)) nest_src
JOIN EACH
(select max(___revision) as max____revision, ___key as max____key from [test_dataset.test_listnum_table]
GROUP EACH BY max____key) mx
ON nest_src.___revision=mx.max____revision and nest_src.___key=mx.max____key
where (link.___rel="self" or link.___rel="alternate")
and regexp_match(link.___href, '^/rrr/[^/]+$')
and rights____type is null
and regexp_match(title, "^.*ユーザ項目.*$")
GROUP EACH by id) grp
ON src.id = grp.id
GROUP BY flg, src.id, id, updated, rights____type, title, link.___rel, link.___href, link.___num, contributor.uri, contributor.name, contributor.___num, author.uri, author.name, author.___num, user_parent.___num, user_parent.parent_text, user_parent.user_child.___num, user_parent.user_child.child_text, user_pine.___num, user_pine.pine_text, user_pine.user_bamboo.___num, user_pine.user_bamboo.bamboo_text, user_pine.user_bamboo.user_plum.___num, user_pine.user_bamboo.user_plum.plum_text, user_paintset.paintset_text, user_paintset.user_palette.palette_text, user_paintset.user_palette.palette_size, user_paintset.user_paintbrush.paintbrush_text, user_paintset.user_paintbrush.user_measure.length_text, user_paintset.user_paintbrush.user_measure.weight_text, user_paintset.user_colors.___num, user_paintset.user_colors.colors_text
ORDER BY src.id, flg, link.___num, contributor.___num, author.___num, user_parent.___num, user_parent.user_child.___num, user_pine.___num, user_pine.user_bamboo.___num, user_pine.user_bamboo.user_plum.___num, user_paintset.user_colors.___num
ざっくりとした要点
- flgでデータ取得先を判定可能。
- entry1件につき1件しかない項目を、flg="0"に指定する。
- リスト項目を、flg={リスト項目名} のUNIONテーブルに指定する。
- UNION + JOINがNGなのでUNIONをサブクエリーとする。
- JOIN EACH 以降のサブクエリーは、条件のある項目のみ指定すればOK。
- 上記SQLを実行するとリスト項目はフラット化されそのままでは元に戻す手段がないため、あらかじめデータ登録時にリストのインデックスを付加する。
- ネストされたリストの場合、リストの各データの階層に「___num」項目をつける。
- ___num項目にはBigQueryに登録時、POSTデータにこの項目にリストの順番で、0から始まる数字を設定する。
- データを検索してEntryオブジェクトに戻す際、この順番通りにリストを復元し、JSONを作成してEntryオブジェクトを生成する。
- 最後のGROUP BYはデータの二重登録に対応。
- BigQueryはキーがないので同じidのデータを複数登録できてしまう。誤って複数登録されてしまった場合の対応としてGROUP BYを指定。
- TYPEがRECORDであっても、MODEがREPEATEDでなければFLATTENしない。
- REPEATEDでないRECORDは、flgをその上位階層(一番上であれば0)とする。
REST APIでアクセスする方法
BigQueryにはClient Libraryがあって、それを使うのが一番楽だし確実だと思うのですが、REST APIを使いこなせるようになると潰しが効くというか妙に自信が湧いてきます。
サンプルもあまりないようなので、汚いコードですが晒しておきます。
アクセストークンさえ取得できれば一通り実行できます。
streamInsert()がStreamingAPIです。insert()はバッチのインサートなのでご注意ください。
public class BQDataInsert {
public static final String CONTENTTYPEJSON = "application/json; charset=UTF-8";
public static final String METHODPOST ="POST";
public static final String METHODPUT ="PUT";
public static final String UTF8 ="UTF-8";
public static final int MAXROWS = 500;
public static final int MAXHTTPSIZE = 1024*1024*9; // HTTP request size limit: 10 MB
/*
* see https://cloud.google.com/bigquery/loading-data-post-request#multipart
*/
public static BQPostResult insert(String accesstoken,String projectId,String datasetId,String tableId,String json) throws ClientProtocolException, IOException, ParseException, InstantiationException, IllegalAccessException, NoSuchFieldException, SecurityException {
String url = "https://www.googleapis.com/upload/bigquery/v2/projects/"+URLEncoder.encode(projectId, UTF8)+"/jobs?uploadType=multipart";
String BOUNDARY = "reflextaggingservice";
return doPost(accesstoken,projectId,url,METHODPOST,"multipart/related; boundary=\""+BOUNDARY+"\"",getInsertJSON(projectId,datasetId,tableId,json,BOUNDARY));
}
public static BQPostResult streamInsert(String accesstoken,String projectId,String datasetId,String tableId,FeedBase feed,IResourceMapper mapper) throws ClientProtocolException, IOException, ParseException, InstantiationException, IllegalAccessException, NoSuchFieldException, SecurityException, SizeLimitExceededException {
String url = "https://www.googleapis.com/bigquery/v2/projects/"+URLEncoder.encode(projectId, UTF8)+"/datasets/"+datasetId+"/tables/"+tableId+"/insertAll";
BQPostResult result = new BQPostResult();
result.rc = -1;
Context ctx = new BQDataInsert().new Context();
ctx.idx =0;
while(ctx.idx<feed.entry.size()) {
String json = getStreamInsertJSON(feed,mapper,ctx);
// 3回リトライ
for(int i=0;i<3;i++) {
result = doPostGzip(accesstoken,projectId,url,METHODPOST,CONTENTTYPEJSON,json);
if (result.rc==200) break;
else {
try{
Thread.sleep(1000); //1000ミリ秒Sleepする
}catch(InterruptedException e){}
}
}
if (result.rc!=200) break;
}
return result;
}
public static BQPostResult createDataset(String accesstoken,String projectId) throws ClientProtocolException, IOException, ParseException, InstantiationException, IllegalAccessException, NoSuchFieldException, SecurityException {
String url = "https://www.googleapis.com/bigquery/v2/projects/"+URLEncoder.encode(projectId, UTF8)+"/datasets";
return doPost(accesstoken,projectId,url,METHODPOST,CONTENTTYPEJSON,getDatasetJson(projectId));
}
public static BQPostResult createTable(String accesstoken,String projectId,String datasetId,String tableId,String schema) throws ClientProtocolException, IOException, ParseException, InstantiationException, IllegalAccessException, NoSuchFieldException, SecurityException {
String url = "https://www.googleapis.com/bigquery/v2/projects/"+URLEncoder.encode(projectId, UTF8)+"/datasets/"+URLEncoder.encode(datasetId, UTF8)+"/tables";
return doPost(accesstoken,projectId,url,METHODPOST,CONTENTTYPEJSON,getTableJson(projectId,datasetId,tableId,schema));
}
public static BQPostResult updateTable(String accesstoken,String projectId,String datasetId,String tableId,String schema) throws ClientProtocolException, IOException, ParseException, InstantiationException, IllegalAccessException, NoSuchFieldException, SecurityException {
String url = "https://www.googleapis.com/bigquery/v2/projects/"+URLEncoder.encode(projectId, UTF8)+"/datasets/"+URLEncoder.encode(datasetId, UTF8)+"/tables/"+tableId;
return doPost(accesstoken,projectId,url,METHODPUT,CONTENTTYPEJSON,getTableJson(projectId,datasetId,tableId,schema));
}
private static BQPostResult doPost(String accesstoken,String projectId,String url,String method,String contenttype,String json) throws IOException, ParseException, InstantiationException, IllegalAccessException, NoSuchFieldException, SecurityException {
Requester requester = new Requester();
HashMap<String,String> property = new HashMap<String,String>();
property.put("Authorization", "Bearer "+accesstoken);
property.put("Content-Type",contenttype);
HttpURLConnection http = requester.prepare(url, method, property);
PrintWriter pw =new PrintWriter(http.getOutputStream());
pw.println(json);
System.out.println(json);
pw.flush();
pw.close();
BQPostResult bqpostResult;
int rc = http.getResponseCode();
InputStream is;
if (rc == 200) {
is = http.getInputStream();
} else {
is = http.getErrorStream();
}
BufferedReader br = new BufferedReader(new InputStreamReader(is));
StringBuilder sb = new StringBuilder();
String str = br.readLine();
while (str != null) {
sb.append(str);
str = br.readLine();
}
System.out.println("resp:"+sb.toString());
br.close();
Object o = (new JSONParser()).parse(sb.toString());
bqpostResult = (BQPostResult) unmarshal(o, BQPostResult.class);
bqpostResult.rc = rc;
return bqpostResult;
}
private static BQPostResult doPostGzip(String accesstoken,String projectId,String url,String method,String contenttype,String json) throws IOException, ParseException, InstantiationException, IllegalAccessException, NoSuchFieldException, SecurityException {
Requester requester = new Requester();
HashMap<String,String> property = new HashMap<String,String>();
property.put("Accept-Encoding", "gzip");
property.put("Authorization", "Bearer "+accesstoken);
property.put("User-Agent", "ReflexWorks Bigquery Client");
property.put("Content-Type",contenttype);
property.put("Content-Encoding","gzip");
HttpURLConnection http = requester.prepare(url, method, property);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(baos);
gzip.write(json.getBytes(Charset.forName("UTF8")));
gzip.close();
http.getOutputStream().write(baos.toByteArray());
System.out.println(json);
BQPostResult bqpostResult;
int rc = http.getResponseCode();
InputStream is;
if (rc == 200) {
is = http.getInputStream();
} else {
is = http.getErrorStream();
}
BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(is)));
StringBuilder sb = new StringBuilder();
String str = br.readLine();
while (str != null) {
sb.append(str);
str = br.readLine();
}
System.out.println("resp:"+sb.toString());
br.close();
Object o = (new JSONParser()).parse(sb.toString());
bqpostResult = (BQPostResult) unmarshal(o, BQPostResult.class);
bqpostResult.rc = rc;
return bqpostResult;
}
private static String getStreamInsertJSON(FeedBase feed,IResourceMapper mapper,Context ctx) throws SizeLimitExceededException {
String result =
"{"+
"\"kind\": \"bigquery#tableDataInsertAllRequest\","+
"\"skipInvalidRows\": false,"+
"\"ignoreUnknownValues\": true,"+
"\"rows\":["+ getInsertIdJson(mapper,feed,ctx) +
"]"+
"}";
return result;
}
private class Context {
public int idx;
}
private static String getInsertIdJson(IResourceMapper mapper,FeedBase feed,Context ctx) throws SizeLimitExceededException {
StringBuilder sb = new StringBuilder();
for(int i=1;;i++,ctx.idx++) {
EntryBase entry = feed.getEntry().get(ctx.idx);
sb.append("{"+
" \"insertId\": \""+entry.id+"\","+
" \"json\": "+ BQJSONSerializer.toJSON(mapper,entry) +
"}");
// maxrows: 500
// maxrowsize: 1MB
if ((feed.getEntry().size()-1)<=ctx.idx||i>=MAXROWS||sb.toString().length()>MAXHTTPSIZE) {
ctx.idx++;
break;
}
sb.append(",");
}
return sb.toString();
}
private static String getInsertJSON(String projectId,String datasetId,String tableId,String json,String BOUNDARY) {
String result ="\n"+
"--"+BOUNDARY+"\n"+
"Content-Type: application/json; charset=UTF-8\n"+
"\n"+
getConfiguration(projectId,datasetId,tableId) + "\n"+
"\n"+
"--"+BOUNDARY+"\n"+
"Content-Type: application/octet-stream\n"+
"\n"+
json +"\n"+
"\n"+
"--"+BOUNDARY+"--";
return result;
}
private static String getConfiguration(String projectId,String datasetId,String tableId) {
String result = "{"+
"\"configuration\": {" +
"\"load\": {" +
"\"sourceFormat\": \"NEWLINE_DELIMITED_JSON\"," +
"\"destinationTable\": {" +
"\"projectId\": \""+projectId+"\"," +
"\"datasetId\": \""+datasetId+"\"," +
"\"tableId\": \""+tableId+"\"" +
"}" +
"}" +
"}" +
"}";
return result;
}
private static String getDatasetJson(String projectId) {
String result = "{"+
"\"datasetReference\": {"+
"\"projectId\": \""+projectId+"\","+
"\"datasetId\": \"vtecx\""+
"}"+
"}";
return result;
}
private static String getTableJson(String projectId, String datasetId,
String tableId,String schema) {
String result = "{"+
"\"tableReference\": {"+
"\"datasetId\": \""+datasetId+"\","+
"\"projectId\": \""+projectId+"\","+
"\"tableId\": \""+tableId+"\""+
"},"+
"\"schema\": {"+
"\"fields\": "+ schema +
"}"+
"}";
return result;
}
private static Object unmarshal(Object obj, Class clazz)
throws InstantiationException, IllegalAccessException,
NoSuchFieldException, SecurityException {
if (clazz == String.class)
return (String)obj;
if (clazz == Integer.class)
return (Integer)obj;
if (clazz == Long.class)
return (Long)obj;
if (clazz == List.class)
return (JSONArray)obj;
JSONObject jObj = (JSONObject)obj;
Object o = clazz.newInstance();
for (Object s: jObj.keySet()) {
try{
Field f = clazz.getField((String)s);
Object inner = unmarshal(jObj.get(s), f.getType());
f.set(o, inner);
}catch(NoSuchFieldException e) {
// do nothing
}
}
return o;
}
}