0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

[講座] Twitter Stream API を使って Azure でデータ分析 - Step4 (ADLS Gen2 への Parquet ファイル出力)

Last updated at Posted at 2021-05-17

今回は、Step4 です。Step3 で作成したコードをベースに編集を行います。ここでは、Twitter API についての説明を省略しますので、Twitter API については、Step1Step2 をご参照ください。

#講座の最終目標
Twitter のツイート データを Azure Data Lake Storage Gen2 (ADLS Gen2) に Parquet 形式のファイルとして自動的に蓄積し、Azure Synapse Analytics (Serverless SQL / Apach Spark) を使って分析できるようにします。ツイート データの継続的な取得と ADLS Gen2 へのデータ蓄積には、Azure Functions を利用します。
image.png

#Step4 の目標
Azure Data Lake Storage Gen2 (ADLS Gen2)Parquet ファイル を C# から出力できるようにすることが目標となります。Parquet 形式のファイルは、Hadoop (Azure HDInsight) / Spark (Azure Databricks, Azure Synapse - Apache Spark) / Azure Synapse - Serverless SQL など、ビッグデータ処理には欠かせないスケーラブルなカラムストア型ファイルになります。アプリ自体は、ローカル環境で動作するコンソール アプリケーションとなります。

#開発環境 (OSS)
開発には、以下を利用します。OSS (無償) かつ クロス プラットフォームとなりますので、Windows / Mac / Linux などお好きな OS/デバイスをご利用ください。

#C# コードの開発
今回の手順では、Visual Studio 2019 を利用します。

##1. Step3 で作成したプロジェクトを開きます
Visual Studio 2019 を起動し、Step3 で作成したプロジェクトを開きます。

##2. Azure Blob Storage (ADLS Gen2 含む) 入出力用 Nuget パッケージの取得
Visual Studio の「Nuget パッケージの管理」機能を使って、(Azure Storage Blob パッケージ) を取得します。
image.png

##3. using ディレクティブの追加(コード)
ADLS Gen2 ストレージへの出力に必要なライブラリ分を追加します。

Program.cs
using System;
using System.IO;
using System.Threading.Tasks;
using System.Collections.Generic;
using Tweetinvi;
using Parquet.Data;
using Parquet;
using Azure.Storage.Blobs;

##4. ADLS Gen2 ファイルパスの生成(コード)
Spark や Synapse Serverless SQL でスケーラブルな IO を実現できるように、日時によるパス (パーティション化) とファイル名が重ならないように GUID を伴うファイルパスを生成するようにします。

dt = DateTime.UtcNow;
blobFileName = $"./tweetdata/{dt.ToString("yyyy")}/{dt.ToString("MM")}/{dt.ToString("dd")}/{dt.ToString("HH")}/tw_{Guid.NewGuid().ToString("D")}.parquet";

##5. メモリ ストリームの利用(コード)
Step3 でのローカル ファイル出力では、ファイル ストリームを使いましたが、ADLS Gen2 への出力を考慮して、Parquet ファイル作成にはメモリ ストリームを利用します。

Stream stream = new MemoryStream();
using (var parquetWriter = new ParquetWriter(schema, stream))
{
    // create a new row group in the file
    using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup())
    {
        groupWriter.WriteColumn(createdAtColumn);
        groupWriter.WriteColumn(createdByColumn);
        groupWriter.WriteColumn(sourceColumn);
        groupWriter.WriteColumn(textColumn);
    }
}

##6. ADLS Gen2 Storage への出力(コード)
事前に ADLS Gen2 Storage (階層型 Blob Storage) とファイル システム (コンテナー) を作成しておきます。ストレージ アカウントの接続文字列とファイル システム名 (コンテナー名) を取得しておき、以下のパラメーターで利用します。手順 4 の Parquet ファイルを出力したメモリ ストリームを使って、ADLS Gen2 にファイルをアップロードします。

// Write to Blob (ADLS Gen2) storage
var connectionString = "<your ADLS Gen2 storage connection string>";
var blobServiceClient = new BlobServiceClient(connectionString);
var containerClient = blobServiceClient.GetBlobContainerClient("<your filesystem name>");

// Get a reference to a blob
BlobClient blobClient = containerClient.GetBlobClient(blobFileName);
stream.Position = 0;
await blobClient.UploadAsync(stream);
stream.Close();

##7. コード全体
上記で主要なコードについて説明しましたが、以下はコード全体となります。Twitter API 認証用の TwitterClient のパラメーター、および、ADLS Gen2 出力用のパラメーター (ストレージ アカウントの接続文字列とファイル システム名) を置換してください。GitHub にも各ステップのコードを共有しています。

Program.cs
using System;
using System.IO;
using System.Threading.Tasks;
using System.Collections.Generic;
using Tweetinvi;
using Parquet.Data;
using Parquet;
using Azure.Storage.Blobs;

namespace TwitterStreamApiConsole
{
    class Program
    {
        private static readonly int maxCount = 5;
        private static int counter;
        private static DateTime dt;
        private static string blobFileName;

        static void Main(string[] args)
        {
            Console.WriteLine($"***** Stream started. {DateTime.UtcNow}");
            dt = DateTime.UtcNow;
            blobFileName = $"./tweetdata/{dt.ToString("yyyy")}/{dt.ToString("MM")}/{dt.ToString("dd")}/{dt.ToString("HH")}/tw_{Guid.NewGuid().ToString("D")}.parquet";

            counter = 0;
            var tweets = new TweetsEntity();
            tweets.CreatedAt = new List<DateTimeOffset>();
            tweets.CreatedBy = new List<string>();
            tweets.Source = new List<string>();
            tweets.Text = new List<string>();

            StartFilteredStream(tweets);

            Console.ReadLine();
        }

        private static async void StartFilteredStream(TweetsEntity tweets)
        {
            // User client & stream
            var client = new TwitterClient("<your API Key>", "<your API Secret>", 
                                             "<your Access Token>", "<your Access Token Secret>");
            var stream = client.Streams.CreateFilteredStream();

            // Add filters
            stream.AddTrack("コロナ");
            stream.AddTrack("大変");

            // Read stream
            stream.MatchingTweetReceived += (sender, args) =>
            {
                var lang = args.Tweet.Language;
                // Specify Japanese & Remove bot tweets
                if (lang == Tweetinvi.Models.Language.Japanese && args.Tweet.Source.Contains(">Twitter "))
                {
                    Console.WriteLine("----------------------------------------------------------------------");
                    Console.WriteLine($"** CreatedAt : {args.Tweet.CreatedAt}");
                    Console.WriteLine($"** CreatedBy : {args.Tweet.CreatedBy}");
                    Console.WriteLine($"** Source    : {args.Tweet.Source}");
                    Console.WriteLine($"** Text      : {args.Tweet.Text}");

                    tweets.CreatedAt.Add(args.Tweet.CreatedAt);
                    tweets.CreatedBy.Add(args.Tweet.CreatedBy.ToString());
                    tweets.Source.Add(args.Tweet.Source);
                    tweets.Text.Add(args.Tweet.Text);
                }
                ++counter;
                if (counter > maxCount)
                {
                    stream.Stop();
                }
            };
            await stream.StartMatchingAllConditionsAsync();

            Console.WriteLine("***** Stream stopped.");
            await CreateParquetFile(tweets);
        }

        private static async Task CreateParquetFile(TweetsEntity tweets)
        {
            ////////////////////////////////////////////////////////////////////////////////////////
            /// Write Parquet file
            /// https://github.com/aloneguid/parquet-dotnet
            /// https://docs.microsoft.com/ja-jp/azure/storage/blobs/storage-quickstart-blobs-dotnet
            ////////////////////////////////////////////////////////////////////////////////////////

            // create data columns with schema metadata and the data
            var createdAtColumn = new Parquet.Data.DataColumn(
                new DataField<DateTimeOffset>("CreatedAt"),
                tweets.CreatedAt.ToArray()
            );
            var createdByColumn = new Parquet.Data.DataColumn(
                new DataField<string>("CreatedBy"),
                tweets.CreatedBy.ToArray()
            );
            var sourceColumn = new Parquet.Data.DataColumn(
                new DataField<string>("Source"),
                tweets.Source.ToArray()
            );
            var textColumn = new Parquet.Data.DataColumn(
                new DataField<string>("Text"),
                tweets.Text.ToArray()
            );

            // create file schema
            var schema = new Schema(createdAtColumn.Field, createdByColumn.Field, sourceColumn.Field, textColumn.Field);

            // create file
            Stream stream = new MemoryStream();
            using (var parquetWriter = new ParquetWriter(schema, stream))
            {
                // create a new row group in the file
                using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup())
                {
                    groupWriter.WriteColumn(createdAtColumn);
                    groupWriter.WriteColumn(createdByColumn);
                    groupWriter.WriteColumn(sourceColumn);
                    groupWriter.WriteColumn(textColumn);
                }
            }

            // Write to Blob storage
            var connectionString = "<your ADLS Gen2 storage connection string>";
            var blobServiceClient = new BlobServiceClient(connectionString);
            var containerClient = blobServiceClient.GetBlobContainerClient("<your filesystem name>");

            // Get a reference to a blob
            BlobClient blobClient = containerClient.GetBlobClient(blobFileName);
            stream.Position = 0;
            await blobClient.UploadAsync(stream);
            stream.Close();
        }

        private class TweetsEntity
        {
            public List<DateTimeOffset> CreatedAt { set; get; }
            public List<string> CreatedBy { set; get; }
            public List<string> Source { set; get; }
            public List<string> Text { set; get; }
        }
    }
}

##8. デバッグ実行
デバッグ実行して、ADLS Gen2 上に指定した日時パスを伴う Parquet ファイルが出力されていれば、成功です。お疲れ様でした。
image.png

#次のステップへ
Step5 では、これまで作成してきたコンソール アプリケーションのコードをタイマーで起動する Azure Functions に載せ替えます。Azure Functions を利用することで、永続的な取り込みを実現します。

#参照
Azure Storage Blob Nuget Package サイト
Apache Parquet for .Net Platform サイト
C# 向け Twitter API SDK (TweetinviAPI) Nuget Package サイト
TweetinviAPI - Filtered Stream API リファレンス
クロス プラットフォーム .NET 概要
Twitter 開発者向けサイト
Twitter API サイト
Visual Studio 2019 Community サイト
Visual Studio Code のサイト

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?