LoginSignup
1
0

More than 3 years have passed since last update.

[講座] Twitter Stream API を使って Azure でデータ分析 - Step3 (Parquet ファイル出力)

Last updated at Posted at 2021-05-17

今回は、Step3 です。Step2 で作成したコードをベースに編集を行いますので、Twitter API についての説明を省略します。Twitter API については、Step1Step2 をご参照ください。

講座の最終目標

Twitter のツイート データを Azure Data Lake Storage Gen2 (ADLS Gen2) に Parquet 形式のファイルとして自動的に蓄積し、Azure Synapse Analytics (Serverless SQL / Apach Spark) を使って分析できるようにします。ツイート データの継続的な取得と ADLS Gen2 へのデータ蓄積には、Azure Functions を利用します。
image.png

Step3 の目標

Parquet ファイル出力 を C# からできるようにすることが目標となります。Parquet 形式のファイルは、Hadoop (Azure HDInsight) / Spark (Azure Databricks, Azure Synapse - Apache Spark) / Azure Synapse - Serverless SQL など、ビッグデータ処理には欠かせないスケーラブルなカラムストア型ファイルになります。Step1、Step2 同様に、Azure は関係なく、ローカル環境に単純なコンソール アプリケーションを作成し、動作させます。

開発環境 (OSS)

開発には、以下を利用します。OSS (無償) かつ クロス プラットフォームとなりますので、Windows / Mac / Linux などお好きな OS/デバイスをご利用ください。

C# コードの開発

今回の手順では、Visual Studio 2019 を利用します。

1. Step2 で作成したプロジェクトを開きます

Visual Studio 2019 を起動し、Step2 で作成したプロジェクトを開きます。

2. Parquet ファイル入出力用 Nuget パッケージの取得

Visual Studio の「Nuget パッケージの管理」機能を使って、(Apache Parquet for .Net Platform) を取得します。
image.png

3. using ディレクティブの追加(コード)

Parquet ファイル出力に必要なライブラリ分を追加します。

using System;
using System.IO;
using System.Threading.Tasks;
using System.Collections.Generic;
using Tweetinvi;
using Parquet.Data;
using Parquet;

4. エンティティ クラスの定義(コード)

ファイル出力用にツイートデータを格納する為のエンティティ クラスを定義します。ストリームから取得可能なツイート作成日時、作成者、作成ソース、ツイート テキストの4項目を利用します。カラムストアへの変換の容易性を考慮し、項目毎にリストで定義します。

private class TweetsEntity
{
    public List<DateTimeOffset> CreatedAt { set; get; }
    public List<string> CreatedBy { set; get; }
    public List<string> Source { set; get; }
    public List<string> Text { set; get; }
}

5. エンティティ オブジェクトの初期化と編集(コード)

Main メソッドでエンティティ オブジェクトを初期化し、MatchingTweetReceived イベントハンドラーで編集を行います。

Main
var tweets = new TweetsEntity();
tweets.CreatedAt = new List<DateTimeOffset>();
tweets.CreatedBy = new List<string>();
tweets.Source = new List<string>();
tweets.Text = new List<string>();
MatchingTweetReceived
stream.MatchingTweetReceived += (sender, args) =>
{
    var lang = args.Tweet.Language;
    // Specify Japanese & Remove bot tweets
    if (lang == Tweetinvi.Models.Language.Japanese && args.Tweet.Source.Contains(">Twitter "))
    {
        Console.WriteLine("----------------------------------------------------------------------");
        Console.WriteLine($"** CreatedAt : {args.Tweet.CreatedAt}");
        Console.WriteLine($"** CreatedBy : {args.Tweet.CreatedBy}");
        Console.WriteLine($"** Source    : {args.Tweet.Source}");
        Console.WriteLine($"** Text      : {args.Tweet.Text}");

        tweets.CreatedAt.Add(args.Tweet.CreatedAt);
        tweets.CreatedBy.Add(args.Tweet.CreatedBy.ToString());
        tweets.Source.Add(args.Tweet.Source);
        tweets.Text.Add(args.Tweet.Text);
    }
    ++counter;
    if (counter > maxCount)
    {
        stream.Stop();
    }
};

6. Parquet スキーマ作成(コード)

出力対象のカラム属性を定義し、スキーマを作成します。エンティティ オブジェクトの各項目を List 型から Array 型に変換して、カラムにマップします。

// create data columns with schema metadata and the data
var createdAtColumn = new Parquet.Data.DataColumn(
    new DataField<DateTimeOffset>("CreatedAt"),
    tweets.CreatedAt.ToArray()
);
var createdByColumn = new Parquet.Data.DataColumn(
    new DataField<string>("CreatedBy"),
    tweets.CreatedBy.ToArray()
);
var sourceColumn = new Parquet.Data.DataColumn(
    new DataField<string>("Source"),
    tweets.Source.ToArray()
);
var textColumn = new Parquet.Data.DataColumn(
    new DataField<string>("Text"),
    tweets.Text.ToArray()
);

// create file schema
var schema = new Schema(createdAtColumn.Field, createdByColumn.Field, sourceColumn.Field, textColumn.Field);

7. Parquet ファイル出力(コード)

出力先のファイル ストリームを作成し、ParquetWriter に渡すことで Parquet ファイルを出力することができます。CreateRowGroup() は、カラムストアを行認識できるようにする為に行グループを作成しています。

// create file
var defaultDir = @"c:\temp\";
var fileName = $"{defaultDir}tweets.parquet";
if (!Directory.Exists(defaultDir))
    Directory.CreateDirectory(defaultDir);
Stream stream = File.OpenWrite(fileName);

using (var parquetWriter = new ParquetWriter(schema, stream))
{
    // create a new row group in the file
    using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup())
    {
        groupWriter.WriteColumn(createdAtColumn);
        groupWriter.WriteColumn(createdByColumn);
        groupWriter.WriteColumn(sourceColumn);
        groupWriter.WriteColumn(textColumn);
    }
}

stream.Close();

8. コード全体

上記で主要なコードについて説明しましたが、以下はコード全体となります。Twitter API 認証用の TwitterClient のパラメーターは置換してください。GitHub にも各ステップのコードを共有しています。

Program.cs
using System;
using System.IO;
using System.Threading.Tasks;
using System.Collections.Generic;
using Tweetinvi;
using Parquet.Data;
using Parquet;

namespace TwitterStreamApiConsole
{
    class Program
    {
        private static readonly int maxCount = 5;
        private static int counter;

        static void Main(string[] args)
        {
            Console.WriteLine($"***** Stream started. {DateTime.UtcNow}");

            counter = 0;
            var tweets = new TweetsEntity();
            tweets.CreatedAt = new List<DateTimeOffset>();
            tweets.CreatedBy = new List<string>();
            tweets.Source = new List<string>();
            tweets.Text = new List<string>();

            StartFilteredStream(tweets);

            Console.ReadLine();
        }

        private static async void StartFilteredStream(TweetsEntity tweets)
        {
            // User client & stream
            var client = new TwitterClient("<your API Key>", "<your API Secret>", 
                                             "<your Access Token>", "<your Access Token Secret>");
            var stream = client.Streams.CreateFilteredStream();

            // Add filters
            stream.AddTrack("コロナ");
            stream.AddTrack("大変");

            // Read stream
            stream.MatchingTweetReceived += (sender, args) =>
            {
                var lang = args.Tweet.Language;
                // Specify Japanese & Remove bot tweets
                if (lang == Tweetinvi.Models.Language.Japanese && args.Tweet.Source.Contains(">Twitter "))
                {
                    Console.WriteLine("----------------------------------------------------------------------");
                    Console.WriteLine($"** CreatedAt : {args.Tweet.CreatedAt}");
                    Console.WriteLine($"** CreatedBy : {args.Tweet.CreatedBy}");
                    Console.WriteLine($"** Source    : {args.Tweet.Source}");
                    Console.WriteLine($"** Text      : {args.Tweet.Text}");

                    tweets.CreatedAt.Add(args.Tweet.CreatedAt);
                    tweets.CreatedBy.Add(args.Tweet.CreatedBy.ToString());
                    tweets.Source.Add(args.Tweet.Source);
                    tweets.Text.Add(args.Tweet.Text);
                }
                ++counter;
                if (counter > maxCount)
                {
                    stream.Stop();
                }
            };
            await stream.StartMatchingAllConditionsAsync();

            Console.WriteLine("***** Stream stopped.");
            await CreateParquetFile(tweets);
        }

        private static async Task CreateParquetFile(TweetsEntity tweets)
        {
            ////////////////////////////////////////////////////////////////////////////////////////
            /// Write Parquet file
            /// https://github.com/aloneguid/parquet-dotnet
            ////////////////////////////////////////////////////////////////////////////////////////

            // create data columns with schema metadata and the data
            var createdAtColumn = new Parquet.Data.DataColumn(
                new DataField<DateTimeOffset>("CreatedAt"),
                tweets.CreatedAt.ToArray()
            );
            var createdByColumn = new Parquet.Data.DataColumn(
                new DataField<string>("CreatedBy"),
                tweets.CreatedBy.ToArray()
            );
            var sourceColumn = new Parquet.Data.DataColumn(
                new DataField<string>("Source"),
                tweets.Source.ToArray()
            );
            var textColumn = new Parquet.Data.DataColumn(
                new DataField<string>("Text"),
                tweets.Text.ToArray()
            );

            // create file schema
            var schema = new Schema(createdAtColumn.Field, createdByColumn.Field, sourceColumn.Field, textColumn.Field);

            // create file
            var defaultDir = @"c:\temp\";
            var fileName = $"{defaultDir}tweets.parquet";
            if (!Directory.Exists(defaultDir))
                Directory.CreateDirectory(defaultDir);
            Stream stream = File.OpenWrite(fileName);

            using (var parquetWriter = new ParquetWriter(schema, stream))
            {
                // create a new row group in the file
                using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup())
                {
                    groupWriter.WriteColumn(createdAtColumn);
                    groupWriter.WriteColumn(createdByColumn);
                    groupWriter.WriteColumn(sourceColumn);
                    groupWriter.WriteColumn(textColumn);
                }
            }

            stream.Close();
        }

        private class TweetsEntity
        {
            public List<DateTimeOffset> CreatedAt { set; get; }
            public List<string> CreatedBy { set; get; }
            public List<string> Source { set; get; }
            public List<string> Text { set; get; }
        }
    }
}

9. デバッグ実行

デバッグ実行して、c:\temp ディレクトリに tweets.parquet ファイルが出力されていれば、成功です。お疲れ様でした。
image.png

次のステップへ

Step4 では、Parquet ファイルを Azure Data Lake Storage Gen2 (ADLS Gen2) に出力できるようにします。

参照

Apache Parquet for .Net Platform サイト
C# 向け Twitter API SDK (TweetinviAPI) Nuget Package サイト
TweetinviAPI - Filtered Stream API リファレンス
クロス プラットフォーム .NET 概要
Twitter 開発者向けサイト
Twitter API サイト
Visual Studio 2019 Community サイト
Visual Studio Code のサイト

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0