今回は、Step3 です。Step2 で作成したコードをベースに編集を行いますので、Twitter API についての説明を省略します。Twitter API については、Step1、Step2 をご参照ください。
講座の最終目標
Twitter のツイート データを Azure Data Lake Storage Gen2 (ADLS Gen2) に Parquet 形式のファイルとして自動的に蓄積し、Azure Synapse Analytics (Serverless SQL / Apach Spark) を使って分析できるようにします。ツイート データの継続的な取得と ADLS Gen2 へのデータ蓄積には、Azure Functions を利用します。
Step3 の目標
Parquet ファイル出力 を C# からできるようにすることが目標となります。Parquet 形式のファイルは、Hadoop (Azure HDInsight) / Spark (Azure Databricks, Azure Synapse - Apache Spark) / Azure Synapse - Serverless SQL など、ビッグデータ処理には欠かせないスケーラブルなカラムストア型ファイルになります。Step1、Step2 同様に、Azure は関係なく、ローカル環境に単純なコンソール アプリケーションを作成し、動作させます。
開発環境 (OSS)
開発には、以下を利用します。OSS (無償) かつ クロス プラットフォームとなりますので、Windows / Mac / Linux などお好きな OS/デバイスをご利用ください。
- ツール : Visual Studio 2019 Community or VS Code
- ランタイム : .NET 5 or .NET Core 3.1
- 言語 : C#
C# コードの開発
今回の手順では、Visual Studio 2019 を利用します。
1. Step2 で作成したプロジェクトを開きます
Visual Studio 2019 を起動し、Step2 で作成したプロジェクトを開きます。
2. Parquet ファイル入出力用 Nuget パッケージの取得
Visual Studio の「Nuget パッケージの管理」機能を使って、(Apache Parquet for .Net Platform) を取得します。
3. using ディレクティブの追加(コード)
Parquet ファイル出力に必要なライブラリ分を追加します。
using System;
using System.IO;
using System.Threading.Tasks;
using System.Collections.Generic;
using Tweetinvi;
using Parquet.Data;
using Parquet;
4. エンティティ クラスの定義(コード)
ファイル出力用にツイートデータを格納する為のエンティティ クラスを定義します。ストリームから取得可能なツイート作成日時、作成者、作成ソース、ツイート テキストの4項目を利用します。カラムストアへの変換の容易性を考慮し、項目毎にリストで定義します。
private class TweetsEntity
{
public List<DateTimeOffset> CreatedAt { set; get; }
public List<string> CreatedBy { set; get; }
public List<string> Source { set; get; }
public List<string> Text { set; get; }
}
5. エンティティ オブジェクトの初期化と編集(コード)
Main メソッドでエンティティ オブジェクトを初期化し、MatchingTweetReceived イベントハンドラーで編集を行います。
var tweets = new TweetsEntity();
tweets.CreatedAt = new List<DateTimeOffset>();
tweets.CreatedBy = new List<string>();
tweets.Source = new List<string>();
tweets.Text = new List<string>();
stream.MatchingTweetReceived += (sender, args) =>
{
var lang = args.Tweet.Language;
// Specify Japanese & Remove bot tweets
if (lang == Tweetinvi.Models.Language.Japanese && args.Tweet.Source.Contains(">Twitter "))
{
Console.WriteLine("----------------------------------------------------------------------");
Console.WriteLine($"** CreatedAt : {args.Tweet.CreatedAt}");
Console.WriteLine($"** CreatedBy : {args.Tweet.CreatedBy}");
Console.WriteLine($"** Source : {args.Tweet.Source}");
Console.WriteLine($"** Text : {args.Tweet.Text}");
tweets.CreatedAt.Add(args.Tweet.CreatedAt);
tweets.CreatedBy.Add(args.Tweet.CreatedBy.ToString());
tweets.Source.Add(args.Tweet.Source);
tweets.Text.Add(args.Tweet.Text);
}
++counter;
if (counter > maxCount)
{
stream.Stop();
}
};
6. Parquet スキーマ作成(コード)
出力対象のカラム属性を定義し、スキーマを作成します。エンティティ オブジェクトの各項目を List 型から Array 型に変換して、カラムにマップします。
// create data columns with schema metadata and the data
var createdAtColumn = new Parquet.Data.DataColumn(
new DataField<DateTimeOffset>("CreatedAt"),
tweets.CreatedAt.ToArray()
);
var createdByColumn = new Parquet.Data.DataColumn(
new DataField<string>("CreatedBy"),
tweets.CreatedBy.ToArray()
);
var sourceColumn = new Parquet.Data.DataColumn(
new DataField<string>("Source"),
tweets.Source.ToArray()
);
var textColumn = new Parquet.Data.DataColumn(
new DataField<string>("Text"),
tweets.Text.ToArray()
);
// create file schema
var schema = new Schema(createdAtColumn.Field, createdByColumn.Field, sourceColumn.Field, textColumn.Field);
7. Parquet ファイル出力(コード)
出力先のファイル ストリームを作成し、ParquetWriter に渡すことで Parquet ファイルを出力することができます。CreateRowGroup() は、カラムストアを行認識できるようにする為に行グループを作成しています。
// create file
var defaultDir = @"c:\temp\";
var fileName = $"{defaultDir}tweets.parquet";
if (!Directory.Exists(defaultDir))
Directory.CreateDirectory(defaultDir);
Stream stream = File.OpenWrite(fileName);
using (var parquetWriter = new ParquetWriter(schema, stream))
{
// create a new row group in the file
using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup())
{
groupWriter.WriteColumn(createdAtColumn);
groupWriter.WriteColumn(createdByColumn);
groupWriter.WriteColumn(sourceColumn);
groupWriter.WriteColumn(textColumn);
}
}
stream.Close();
8. コード全体
上記で主要なコードについて説明しましたが、以下はコード全体となります。Twitter API 認証用の TwitterClient のパラメーターは置換してください。GitHub にも各ステップのコードを共有しています。
using System;
using System.IO;
using System.Threading.Tasks;
using System.Collections.Generic;
using Tweetinvi;
using Parquet.Data;
using Parquet;
namespace TwitterStreamApiConsole
{
class Program
{
private static readonly int maxCount = 5;
private static int counter;
static void Main(string[] args)
{
Console.WriteLine($"***** Stream started. {DateTime.UtcNow}");
counter = 0;
var tweets = new TweetsEntity();
tweets.CreatedAt = new List<DateTimeOffset>();
tweets.CreatedBy = new List<string>();
tweets.Source = new List<string>();
tweets.Text = new List<string>();
StartFilteredStream(tweets);
Console.ReadLine();
}
private static async void StartFilteredStream(TweetsEntity tweets)
{
// User client & stream
var client = new TwitterClient("<your API Key>", "<your API Secret>",
"<your Access Token>", "<your Access Token Secret>");
var stream = client.Streams.CreateFilteredStream();
// Add filters
stream.AddTrack("コロナ");
stream.AddTrack("大変");
// Read stream
stream.MatchingTweetReceived += (sender, args) =>
{
var lang = args.Tweet.Language;
// Specify Japanese & Remove bot tweets
if (lang == Tweetinvi.Models.Language.Japanese && args.Tweet.Source.Contains(">Twitter "))
{
Console.WriteLine("----------------------------------------------------------------------");
Console.WriteLine($"** CreatedAt : {args.Tweet.CreatedAt}");
Console.WriteLine($"** CreatedBy : {args.Tweet.CreatedBy}");
Console.WriteLine($"** Source : {args.Tweet.Source}");
Console.WriteLine($"** Text : {args.Tweet.Text}");
tweets.CreatedAt.Add(args.Tweet.CreatedAt);
tweets.CreatedBy.Add(args.Tweet.CreatedBy.ToString());
tweets.Source.Add(args.Tweet.Source);
tweets.Text.Add(args.Tweet.Text);
}
++counter;
if (counter > maxCount)
{
stream.Stop();
}
};
await stream.StartMatchingAllConditionsAsync();
Console.WriteLine("***** Stream stopped.");
await CreateParquetFile(tweets);
}
private static async Task CreateParquetFile(TweetsEntity tweets)
{
////////////////////////////////////////////////////////////////////////////////////////
/// Write Parquet file
/// https://github.com/aloneguid/parquet-dotnet
////////////////////////////////////////////////////////////////////////////////////////
// create data columns with schema metadata and the data
var createdAtColumn = new Parquet.Data.DataColumn(
new DataField<DateTimeOffset>("CreatedAt"),
tweets.CreatedAt.ToArray()
);
var createdByColumn = new Parquet.Data.DataColumn(
new DataField<string>("CreatedBy"),
tweets.CreatedBy.ToArray()
);
var sourceColumn = new Parquet.Data.DataColumn(
new DataField<string>("Source"),
tweets.Source.ToArray()
);
var textColumn = new Parquet.Data.DataColumn(
new DataField<string>("Text"),
tweets.Text.ToArray()
);
// create file schema
var schema = new Schema(createdAtColumn.Field, createdByColumn.Field, sourceColumn.Field, textColumn.Field);
// create file
var defaultDir = @"c:\temp\";
var fileName = $"{defaultDir}tweets.parquet";
if (!Directory.Exists(defaultDir))
Directory.CreateDirectory(defaultDir);
Stream stream = File.OpenWrite(fileName);
using (var parquetWriter = new ParquetWriter(schema, stream))
{
// create a new row group in the file
using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup())
{
groupWriter.WriteColumn(createdAtColumn);
groupWriter.WriteColumn(createdByColumn);
groupWriter.WriteColumn(sourceColumn);
groupWriter.WriteColumn(textColumn);
}
}
stream.Close();
}
private class TweetsEntity
{
public List<DateTimeOffset> CreatedAt { set; get; }
public List<string> CreatedBy { set; get; }
public List<string> Source { set; get; }
public List<string> Text { set; get; }
}
}
}
9. デバッグ実行
デバッグ実行して、c:\temp ディレクトリに tweets.parquet ファイルが出力されていれば、成功です。お疲れ様でした。
次のステップへ
Step4 では、Parquet ファイルを Azure Data Lake Storage Gen2 (ADLS Gen2) に出力できるようにします。
参照
Apache Parquet for .Net Platform サイト
C# 向け Twitter API SDK (TweetinviAPI) Nuget Package サイト
TweetinviAPI - Filtered Stream API リファレンス
クロス プラットフォーム .NET 概要
Twitter 開発者向けサイト
Twitter API サイト
Visual Studio 2019 Community サイト
Visual Studio Code のサイト