1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

[講座] Twitter Stream API を使って Azure でデータ分析 - Step2 (Filtered Stream API)

Last updated at Posted at 2021-05-14

今回は、Step2 です。Step1 で作成したコードをベースに編集を行いますので、Twitter API 環境の整備については説明を省略します。

#講座の最終目標
Twitter のツイート データを Azure Data Lake Storage Gen2 (ADLS Gen2) に Parquet 形式のファイルとして自動的に蓄積し、Azure Synapse Analytics (Serverless SQL / Apach Spark) を使って分析できるようにします。ツイート データの継続的な取得と ADLS Gen2 へのデータ蓄積には、Azure Functions を利用します。
image.png

#Step2 の目標
Filtered Stream API を C# から扱えるようにすることが目標となります。この API の特徴は、複数のキーワードでストリームをフィルタリング可能ですので、特定の情報を収集する際に向いています。SDK では、この機能は、まだ V2 プレビューに対応していない為、V1.1 を利用します。Step1 同様に、Azure は関係なく、ローカル環境に単純なコンソール アプリケーションを作成し、動作させます。

#Twitter API
Twitter の開発者向けプログラムを通して提供される API です。各 API では要求に対するレート制限が設定されており、Stream API の一般的な無償利用の場合、50 万ツイート/月の取得制限が入ります。

#開発環境 (OSS)
開発には、以下を利用します。OSS (無償) かつ クロス プラットフォームとなりますので、Windows / Mac / Linux などお好きな OS/デバイスをご利用ください。

#Twitter API を呼び出す C# コードの開発
今回の手順では、Visual Studio 2019 を利用します。

##1. Step1 で作成したプロジェクトを開きます
Visual Studio 2019 を起動し、Step1 で作成したプロジェクトを開きます。

##2. 認証とストリームの作成(コード)
Step1 の Sampled Stream V2 API と異なるのは、アプリ認証ではなく、ユーザー認証となることです。Twitter Developer ポータルで作成/取得した API Key / API Secret / Access Token / Access Token Secret の値で、以下のパラメーターを置き換えてください。ストリームを作成するクラスも異なりますので、ご注意ください。

var client = new TwitterClient("<your API Key>", "<your API Secret>", 
                                  "<your Access Token>", "<your Access Token Secret>");
var stream = client.Streams.CreateFilteredStream();

##3. フィルターの設定(コード)
フィルターは複数設定することができます。

stream.AddTrack("コロナ");
stream.AddTrack("大変");

##4. イベントハンドラーと処理内容の記述(コード)
V2 プレビュー API と V1.1 API では、Language の扱いが異なりますので、日本語の判定方法を変えています。また、v1.1 API では args.Tweet.Source に HTML タグを含む値が入ってきますので、ご注意ください。args.Tweet.Source.Contains(">Twitter ") の部分は、ボットによるツイートを除去するような条件を入れています。

stream.MatchingTweetReceived += (sender, args) =>
{
    var lang = args.Tweet.Language;
    // Specify Japanese & Remove bot tweets
    if (lang == Tweetinvi.Models.Language.Japanese && args.Tweet.Source.Contains(">Twitter "))
    {
        Console.WriteLine($"** Text : {args.Tweet.Text}");
    }
    ++counter;
    if (counter >= maxCount)
    {
        stream.Stop();
    }
};

##5. ストリームの読み取り開始(コード)
StartMatchingAllConditionsAsync は複数のフィルター値が AND 条件となり、StartMatchingAnyConditionAsyncOR 条件でのマッチングになりますので、ご注意ください。

await stream.StartMatchingAllConditionsAsync();

##6. コード全体
上記で主要なコードについて説明しましたが、以下はコード全体となります。GitHub にも各ステップのコードを共有しています。

Program.cs
using System;
using System.Threading.Tasks;
using Tweetinvi;

namespace TwitterStreamApiConsole
{
    class Program
    {
        private static readonly int maxCount = 5;
        private static int counter;

        static void Main(string[] args)
        {
            counter = 0;

            // Start Twitter Stream reading with Filtered Stream API
            StartFilteredStream().Wait();

            Console.ReadLine();
        }

        /// <summary>
        /// Twitter Filtered Stream API
        /// Nuget Package : Tweetinvi
        /// https://linvi.github.io/tweetinvi/dist/intro/basic-concepts.html#twitterclient
        /// </summary>
        private static async Task StartFilteredStream()
        {
            Console.WriteLine($"***** Stream started. {DateTime.UtcNow}");

            // User client & stream
            var client = new TwitterClient("<your API Key>", "<your API Secret>", 
                                             "<your Access Token>", "<your Access Token Secret>");
            var stream = client.Streams.CreateFilteredStream();

            // Add filters
            stream.AddTrack("コロナ");
            stream.AddTrack("大変");

            // Read stream
            stream.MatchingTweetReceived += (sender, args) =>
            {
                var lang = args.Tweet.Language;
                // Specify Japanese & Remove bot tweets
                if (lang == Tweetinvi.Models.Language.Japanese && args.Tweet.Source.Contains(">Twitter "))
                {
                    Console.WriteLine("----------------------------------------------------------------------");
                    Console.WriteLine($"** CreatedAt : {args.Tweet.CreatedAt}");
                    Console.WriteLine($"** CreatedBy : {args.Tweet.CreatedBy}");
                    Console.WriteLine($"** Source    : {args.Tweet.Source}");
                    Console.WriteLine($"** Text      : {args.Tweet.Text}");
                }
                ++counter;
                if (counter >= maxCount)
                {
                    stream.Stop();
                }
            };
            await stream.StartMatchingAllConditionsAsync();

            Console.WriteLine();
            Console.WriteLine($"***** Stream stopped. {DateTime.UtcNow} (counter : {counter})");
        }
    }
}

##7. デバッグ実行
デバッグ実行して、以下のようにキーワードに関連したツイートがリアルタイムに表示されれば、成功です。お疲れ様でした。
image.png

#次のステップへ
Step3 では、Hadoop / Spark / Synapse Serverless SQL で標準的に利用される圧縮済みカラムストア型ファイルである Parquet ファイルの作成/出力を C# コードで実施します。

#参照
C# 向け Twitter API SDK (TweetinviAPI) Nuget Package サイト
TweetinviAPI - Filtered Stream API リファレンス
クロス プラットフォーム .NET 概要
Twitter 開発者向けサイト
Twitter API サイト
Visual Studio 2019 Community サイト
Visual Studio Code のサイト

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?