今回は、Step3 です。Step2 で作成したコードをベースに編集を行いますので、Twitter API についての説明を省略します。Twitter API については、Step1、Step2 をご参照ください。
#講座の最終目標
Twitter のツイート データを Azure Data Lake Storage Gen2 (ADLS Gen2) に Parquet 形式のファイルとして自動的に蓄積し、Azure Synapse Analytics (Serverless SQL / Apach Spark) を使って分析できるようにします。ツイート データの継続的な取得と ADLS Gen2 へのデータ蓄積には、Azure Functions を利用します。
#Step3 の目標
Parquet ファイル出力 を C# からできるようにすることが目標となります。Parquet 形式のファイルは、Hadoop (Azure HDInsight) / Spark (Azure Databricks, Azure Synapse - Apache Spark) / Azure Synapse - Serverless SQL など、ビッグデータ処理には欠かせないスケーラブルなカラムストア型ファイルになります。Step1、Step2 同様に、Azure は関係なく、ローカル環境に単純なコンソール アプリケーションを作成し、動作させます。
#開発環境 (OSS)
開発には、以下を利用します。OSS (無償) かつ クロス プラットフォームとなりますので、Windows / Mac / Linux などお好きな OS/デバイスをご利用ください。
- ツール : Visual Studio 2019 Community or VS Code
- ランタイム : .NET 5 or .NET Core 3.1
- 言語 : C#
#C# コードの開発
今回の手順では、Visual Studio 2019 を利用します。
##1. Step2 で作成したプロジェクトを開きます
Visual Studio 2019 を起動し、Step2 で作成したプロジェクトを開きます。
##2. Parquet ファイル入出力用 Nuget パッケージの取得
Visual Studio の「Nuget パッケージの管理」機能を使って、(Apache Parquet for .Net Platform) を取得します。
##3. using ディレクティブの追加(コード)
Parquet ファイル出力に必要なライブラリ分を追加します。
using System;
using System.IO;
using System.Threading.Tasks;
using System.Collections.Generic;
using Tweetinvi;
using Parquet.Data;
using Parquet;
##4. エンティティ クラスの定義(コード)
ファイル出力用にツイートデータを格納する為のエンティティ クラスを定義します。ストリームから取得可能なツイート作成日時、作成者、作成ソース、ツイート テキストの4項目を利用します。カラムストアへの変換の容易性を考慮し、項目毎にリストで定義します。
private class TweetsEntity
{
public List<DateTimeOffset> CreatedAt { set; get; }
public List<string> CreatedBy { set; get; }
public List<string> Source { set; get; }
public List<string> Text { set; get; }
}
##5. エンティティ オブジェクトの初期化と編集(コード)
Main メソッドでエンティティ オブジェクトを初期化し、MatchingTweetReceived イベントハンドラーで編集を行います。
var tweets = new TweetsEntity();
tweets.CreatedAt = new List<DateTimeOffset>();
tweets.CreatedBy = new List<string>();
tweets.Source = new List<string>();
tweets.Text = new List<string>();
stream.MatchingTweetReceived += (sender, args) =>
{
var lang = args.Tweet.Language;
// Specify Japanese & Remove bot tweets
if (lang == Tweetinvi.Models.Language.Japanese && args.Tweet.Source.Contains(">Twitter "))
{
Console.WriteLine("----------------------------------------------------------------------");
Console.WriteLine($"** CreatedAt : {args.Tweet.CreatedAt}");
Console.WriteLine($"** CreatedBy : {args.Tweet.CreatedBy}");
Console.WriteLine($"** Source : {args.Tweet.Source}");
Console.WriteLine($"** Text : {args.Tweet.Text}");
tweets.CreatedAt.Add(args.Tweet.CreatedAt);
tweets.CreatedBy.Add(args.Tweet.CreatedBy.ToString());
tweets.Source.Add(args.Tweet.Source);
tweets.Text.Add(args.Tweet.Text);
}
++counter;
if (counter > maxCount)
{
stream.Stop();
}
};
##6. Parquet スキーマ作成(コード)
出力対象のカラム属性を定義し、スキーマを作成します。エンティティ オブジェクトの各項目を List 型から Array 型に変換して、カラムにマップします。
// create data columns with schema metadata and the data
var createdAtColumn = new Parquet.Data.DataColumn(
new DataField<DateTimeOffset>("CreatedAt"),
tweets.CreatedAt.ToArray()
);
var createdByColumn = new Parquet.Data.DataColumn(
new DataField<string>("CreatedBy"),
tweets.CreatedBy.ToArray()
);
var sourceColumn = new Parquet.Data.DataColumn(
new DataField<string>("Source"),
tweets.Source.ToArray()
);
var textColumn = new Parquet.Data.DataColumn(
new DataField<string>("Text"),
tweets.Text.ToArray()
);
// create file schema
var schema = new Schema(createdAtColumn.Field, createdByColumn.Field, sourceColumn.Field, textColumn.Field);
##7. Parquet ファイル出力(コード)
出力先のファイル ストリームを作成し、ParquetWriter に渡すことで Parquet ファイルを出力することができます。CreateRowGroup() は、カラムストアを行認識できるようにする為に行グループを作成しています。
// create file
var defaultDir = @"c:\temp\";
var fileName = $"{defaultDir}tweets.parquet";
if (!Directory.Exists(defaultDir))
Directory.CreateDirectory(defaultDir);
Stream stream = File.OpenWrite(fileName);
using (var parquetWriter = new ParquetWriter(schema, stream))
{
// create a new row group in the file
using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup())
{
groupWriter.WriteColumn(createdAtColumn);
groupWriter.WriteColumn(createdByColumn);
groupWriter.WriteColumn(sourceColumn);
groupWriter.WriteColumn(textColumn);
}
}
stream.Close();
##8. コード全体
上記で主要なコードについて説明しましたが、以下はコード全体となります。Twitter API 認証用の TwitterClient のパラメーターは置換してください。GitHub にも各ステップのコードを共有しています。
using System;
using System.IO;
using System.Threading.Tasks;
using System.Collections.Generic;
using Tweetinvi;
using Parquet.Data;
using Parquet;
namespace TwitterStreamApiConsole
{
class Program
{
private static readonly int maxCount = 5;
private static int counter;
static void Main(string[] args)
{
Console.WriteLine($"***** Stream started. {DateTime.UtcNow}");
counter = 0;
var tweets = new TweetsEntity();
tweets.CreatedAt = new List<DateTimeOffset>();
tweets.CreatedBy = new List<string>();
tweets.Source = new List<string>();
tweets.Text = new List<string>();
StartFilteredStream(tweets);
Console.ReadLine();
}
private static async void StartFilteredStream(TweetsEntity tweets)
{
// User client & stream
var client = new TwitterClient("<your API Key>", "<your API Secret>",
"<your Access Token>", "<your Access Token Secret>");
var stream = client.Streams.CreateFilteredStream();
// Add filters
stream.AddTrack("コロナ");
stream.AddTrack("大変");
// Read stream
stream.MatchingTweetReceived += (sender, args) =>
{
var lang = args.Tweet.Language;
// Specify Japanese & Remove bot tweets
if (lang == Tweetinvi.Models.Language.Japanese && args.Tweet.Source.Contains(">Twitter "))
{
Console.WriteLine("----------------------------------------------------------------------");
Console.WriteLine($"** CreatedAt : {args.Tweet.CreatedAt}");
Console.WriteLine($"** CreatedBy : {args.Tweet.CreatedBy}");
Console.WriteLine($"** Source : {args.Tweet.Source}");
Console.WriteLine($"** Text : {args.Tweet.Text}");
tweets.CreatedAt.Add(args.Tweet.CreatedAt);
tweets.CreatedBy.Add(args.Tweet.CreatedBy.ToString());
tweets.Source.Add(args.Tweet.Source);
tweets.Text.Add(args.Tweet.Text);
}
++counter;
if (counter > maxCount)
{
stream.Stop();
}
};
await stream.StartMatchingAllConditionsAsync();
Console.WriteLine("***** Stream stopped.");
await CreateParquetFile(tweets);
}
private static async Task CreateParquetFile(TweetsEntity tweets)
{
////////////////////////////////////////////////////////////////////////////////////////
/// Write Parquet file
/// https://github.com/aloneguid/parquet-dotnet
////////////////////////////////////////////////////////////////////////////////////////
// create data columns with schema metadata and the data
var createdAtColumn = new Parquet.Data.DataColumn(
new DataField<DateTimeOffset>("CreatedAt"),
tweets.CreatedAt.ToArray()
);
var createdByColumn = new Parquet.Data.DataColumn(
new DataField<string>("CreatedBy"),
tweets.CreatedBy.ToArray()
);
var sourceColumn = new Parquet.Data.DataColumn(
new DataField<string>("Source"),
tweets.Source.ToArray()
);
var textColumn = new Parquet.Data.DataColumn(
new DataField<string>("Text"),
tweets.Text.ToArray()
);
// create file schema
var schema = new Schema(createdAtColumn.Field, createdByColumn.Field, sourceColumn.Field, textColumn.Field);
// create file
var defaultDir = @"c:\temp\";
var fileName = $"{defaultDir}tweets.parquet";
if (!Directory.Exists(defaultDir))
Directory.CreateDirectory(defaultDir);
Stream stream = File.OpenWrite(fileName);
using (var parquetWriter = new ParquetWriter(schema, stream))
{
// create a new row group in the file
using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup())
{
groupWriter.WriteColumn(createdAtColumn);
groupWriter.WriteColumn(createdByColumn);
groupWriter.WriteColumn(sourceColumn);
groupWriter.WriteColumn(textColumn);
}
}
stream.Close();
}
private class TweetsEntity
{
public List<DateTimeOffset> CreatedAt { set; get; }
public List<string> CreatedBy { set; get; }
public List<string> Source { set; get; }
public List<string> Text { set; get; }
}
}
}
##9. デバッグ実行
デバッグ実行して、c:\temp ディレクトリに tweets.parquet ファイルが出力されていれば、成功です。お疲れ様でした。
#次のステップへ
Step4 では、Parquet ファイルを Azure Data Lake Storage Gen2 (ADLS Gen2) に出力できるようにします。
#参照
Apache Parquet for .Net Platform サイト
C# 向け Twitter API SDK (TweetinviAPI) Nuget Package サイト
TweetinviAPI - Filtered Stream API リファレンス
クロス プラットフォーム .NET 概要
Twitter 開発者向けサイト
Twitter API サイト
Visual Studio 2019 Community サイト
Visual Studio Code のサイト