今回は、Step5 です。Step4 で作成したコードを流用します。ここでは、Twitter API についての説明を省略しますので、Twitter API については、Step1、Step2 をご参照ください。
★お詫び:Function App のコードにバグがありました。本ページ、および、GitHub のコードは修正済みです。
<修正前 (誤り)>
if (twMaxTimeSpanMins != null)
maxTimeSpan = new TimeSpan(int.Parse(twMaxTimeSpanMins), 0, 0);
<修正後 (正解)>
if (twMaxTimeSpanMins != null)
maxTimeSpan = new TimeSpan(0, int.Parse(twMaxTimeSpanMins), 0);
#講座の最終目標
Twitter のツイート データを Azure Data Lake Storage Gen2 (ADLS Gen2) に Parquet 形式のファイルとして自動的に蓄積し、Azure Synapse Analytics (Serverless SQL / Apach Spark) を使って分析できるようにします。ツイート データの継続的な取得と ADLS Gen2 へのデータ蓄積には、Azure Functions を利用します。
#Step5 の目標
これまで作成してきたコードをほぼそのまま流用し、Azure Functions に移植します。タイマー起動 (Timer Trigger) により、ADLS Gen2 にツイートデータの永続的な蓄積が出来るようにすることが目標となります。
#Azure Functions 利用における注意点
Azure Functions (Function App) を作成する際、以下のプランが選択できます。
- 従量課金 (消費) プラン
- Functions Premium プラン
- App Service プラン
以下は、プラン別のタイムアウト期間を表しています。既定値は、host.json の functionTimeout パラメーターによって変更できますが、従量課金プランの場合、月の実行回数・実行時間によっては無償になる為、1 回起動時の最大実行時間が 10 分に制限されていますので、ご注意ください。
バッチ処理として本来望ましいのは、App Service プランです。複数の Function App を 1 つの App Service プラン (コンピューティング) に割り当てることができますので、凝集効果を得られ、必ずしも高価な訳ではありませんが、今回のアプリのみを試すことだけが目的であれば、従量課金プランをご利用ください。
#開発環境 (OSS)
開発には、以下を利用します。OSS (無償) かつ クロス プラットフォームとなりますので、Windows / Mac / Linux などお好きな OS/デバイスをご利用ください。
- ツール : Visual Studio 2019 Community or VS Code
- ランタイム : .NET 5 or .NET Core 3.1
- 言語 : C#
#C# コードの開発
今回の手順では、Visual Studio 2019 を利用します。
##1. 新規にプロジェクトを作成します
Visual Studio 2019 を起動し、「新しいプロジェクトの作成」で、Azure Functions のテンプレートを選択します。プロジェクト名は、Azure Functions のデプロイ名 (Function App 名) にも使えるように、「<自分のニックネーム>-tweets」(私の場合、dokums-tweets) などとしておきます。
「Timer trigger」を選択して、「作成」を行います。Visual Studio で Function App を作成した場合、コンパイル型 (実行が高速) となり、Azure ポータルからは編集できませんので、ご注意ください。
##2. プロジェクト作成直後のコードの確認
以下は、プロジェクト作成直後の Function1.cs のコードを示しています。属性として、FunctionName が指定されており、ここの値 ("Function1") が関数名となり、実行上のエントリーポイントになります。これまで作成してきたコンソール アプリケーションでは、Main メソッドに相当します。
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
namespace dokums_tweets
{
public static class Function1
{
[FunctionName("Function1")]
public static void Run([TimerTrigger("0 */5 * * * *")]TimerInfo myTimer, ILogger log)
{
log.LogInformation($"C# Timer trigger function executed at: {DateTime.Now}");
}
}
}
##3. ファイル名やコード上の名前の変更
ファイル名や関数名などを変更しておきます。
- ファイル名:「Function1.cs」 → 「Tweets.cs」
- クラス名:「Function1」 → 「Tweets」
- 関数名:「Function1」 → 「StoreTweetData」
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
namespace dokums_tweets
{
public static class Tweets
{
[FunctionName("StoreTweetData")]
public static void Run([TimerTrigger("0 */5 * * * *")]TimerInfo myTimer, ILogger log)
{
log.LogInformation($"C# Timer trigger function executed at: {DateTime.Now}");
}
}
}
##4. タイムアウト時間の変更
host.json ファイルに functionTimeout パラメーターを追加します。従量課金プランが選択された場合は 10 分、その他のプランの場合は 1 時間に変更します。タイムアウトとは関係ありませんが、モニタリングの為の applicationInsights パラメーターについて、サンプリング指定を無効化する為、isEnabled を false にしておきます。
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": false,
"excludedTypes": "Request"
}
}
},
"functionTimeout": "00:10:00"
}
##5. 実行間隔の変更(コード)
1 時間に 1 度起動するように、TimerTrigger 属性を変更します。
[FunctionName("StoreTweetData")]
public static void Run([TimerTrigger("0 0 */1 * * *")]TimerInfo myTimer, ILogger log)
##6. 各種設定値の外部変数化
local.settings.json ファイルに各種設定値をパラメーターとして外部変数化します。TwitterApiKey 以降が追加のパラメーターとなります。Azure へのデプロイ後は、Function App の「構成」メニューで定義するパラメーターから local.settings.json 同様に環境変数として読み取りができます。
Visual Studio で自動生成された local.settings.json ファイルのエンコードは、ASCII になっている為、メモ帳などで一度ファイルを開いて、UTF8 で保存し直してください。そのままだと、アプリで読み込んだ時に、日本語が文字化けします。
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"TwitterApiKey": "<your Twitter API Key>",
"TwitterApiSecret": "<your Twitter API Secret>",
"TwitterAccessToken": "<your Twitter Access Token>",
"TwitterAccessTokenSecret": "<your Twitter Access Token Secret>",
"StorageConnectionString": "<your ADLS Gen2 storage account connection string>",
"StorageContinerName": "<your ADLS Gen2 Filesystem (Container) name>",
"TwMaxTimeSpanMins": "9",
"TwMaxCount": "1000",
"TwCommitInterval": "100",
"TwKeywords": "コロナ 大変"
}
}
- TwMaxTimeSpanMins : 処理時間による終了条件 (Functions のタイムアウト値よりも短くしておきます)
- TwMaxCount : 処理行数による終了条件 (50 万ツイート/月のレート制限を考慮して設定します)
- TwCommitInterval : Parquet ファイル内の最大行数 (※1)
- TwKeywords : スペース (半角 or 全角の空白) 区切りで、キーワードを指定します
※1: Parquet ファイルはある程度大きい方が分析時の IO 効率が良いのですが、取り込み失敗を少なくする為、100 程度にしておきます
##7. Nuget パッケージの追加
Visual Studio の「Nuget パッケージの管理」機能を使って、Step 4 までに利用したすべての SDK と Microsoft.ApplicationInsights.WorkerService を追加します。
##8. using ディレクティブの追加(コード)
以下のように必要なライブラリ分を追加します。
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using Tweetinvi;
using Parquet.Data;
using Parquet;
using Azure.Storage.Blobs;
##9. グローバル変数の定義(コード)
外部設定から読み出した値を格納したり、コード全体で共有する為のグローバル変数を定義します。
private static readonly object countLock = new object();
private static string twitterApiKey;
private static string twitterApiSecret;
private static string twitterAccessToken;
private static string twitterAccessTokenSecret;
private static string storageConnectionString;
private static string storageContainerName;
private static TimeSpan maxTimeSpan = new TimeSpan(1, 0, 0); // Max lifecycle time per launch (Check 'functionTimeout' in host.json)
//private static int maxCount = 672; // 500,000÷31÷24
private static int maxCount = 1000; // Max number of Tweets retrieved per launch (Default:1000)
private static int commitInterval = 100; // Max number of Tweets retrieved per loop (Default:100)
private static int counter;
private static DateTime startDt;
private static string blobFileName;
private static string filteredKeywords;
private static bool completed = false;
##10. 外部設定値の読み出し(コード)
local.settings.json や Function App の「構成」で定義されたパラメーターを読み出して、グローバル変数に設定します。
// Environment Variables
twitterApiKey = Environment.GetEnvironmentVariable("TwitterApiKey");
twitterApiSecret = Environment.GetEnvironmentVariable("TwitterApiSecret");
twitterAccessToken = Environment.GetEnvironmentVariable("TwitterAccessToken");
twitterAccessTokenSecret = Environment.GetEnvironmentVariable("TwitterAccessTokenSecret");
storageConnectionString = Environment.GetEnvironmentVariable("StorageConnectionString");
storageContainerName = Environment.GetEnvironmentVariable("StorageContinerName");
string twMaxTimeSpanMins = Environment.GetEnvironmentVariable("TwMaxTimeSpanMins");
if (twMaxTimeSpanMins != null)
maxTimeSpan = new TimeSpan(0, int.Parse(twMaxTimeSpanMins), 0);
string twMaxCount = Environment.GetEnvironmentVariable("TwMaxCount");
if (twMaxCount != null)
maxCount = int.Parse(twMaxCount);
string twCommitInterval = Environment.GetEnvironmentVariable("TwCommitInterval");
if (twCommitInterval != null)
commitInterval = int.Parse(twCommitInterval);
filteredKeywords = Environment.GetEnvironmentVariable("TwKeywords"); // Check whether local.settings.json is UTF8 or not
if (filteredKeywords == null)
throw new ApplicationException("TwKeywords not set");
##11. メイン処理(コード)
Parquet ファイル出力のコミット条件、ストリームから読み出したツイートの件数や時間などの終了条件に合わせて制御する為、Run メソッドでループ処理を行います。
// Initialize values
startDt = DateTime.UtcNow;
completed = false;
counter = 0;
// Start Twitter Stream reading loop
while (true)
{
var dt = DateTime.UtcNow;
blobFileName = $"./tweetdata/{dt.ToString("yyyy")}/{dt.ToString("MM")}/{dt.ToString("dd")}/{dt.ToString("HH")}/tw_{Guid.NewGuid().ToString("D")}.parquet";
var tweets = new TweetsEntity();
tweets.CreatedAt = new List<DateTimeOffset>();
tweets.CreatedBy = new List<string>();
tweets.Source = new List<string>();
tweets.Text = new List<string>();
StartFilteredStream(tweets, commitInterval, log).Wait();
if (completed)
break;
Thread.Sleep(1000);
}
##12. ストリーム処理(コード)
Twitter API のセッションが途中で終了することを想定して、特定の例外が発生した場合にリトライする処理を記述します。
while (true)
{
try
{
/*----- ストリーム処理 -----*/
}
catch (Exception ex)
{
if (ex.Message.Contains("The response ended prematurely."))
{
stream?.Stop();
await Task.Delay(1000);
log.LogInformation($"***** Retry to start stream : {DateTime.UtcNow}");
}
else
throw;
}
}
外部設定したフィルター用キーワードを空白区切りで複数設定できるようにしておきます。
// Add filters
log.LogInformation($"***** Filtered Keywords : {filteredKeywords}");
var keywords = filteredKeywords.Split(new char[] { ' ', ' ' });
foreach (var keyword in keywords)
{
stream.AddTrack(keyword);
}
コミット条件、終了条件を設定します。グローバル変数の counter については、マルチスレッドによる同時オペレーションを排除する為、lock を利用して保護します。
lock (countLock)
{
++counter;
}
if (counter >= maxCount || (DateTime.UtcNow - startDt) >= maxTimeSpan)
{
stream.Stop();
completed = true;
}
else if ((counter - prevCounter) >= commitInterval)
{
stream.Stop();
}
##13. コード全体
上記で主要なコードについて説明しましたが、以下はコード全体となります。GitHub にも各ステップのコードを共有しています。
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using Tweetinvi;
using Parquet.Data;
using Parquet;
using Azure.Storage.Blobs;
namespace dokums_tweets
{
public static class Tweets
{
private static readonly object countLock = new object();
private static string twitterApiKey;
private static string twitterApiSecret;
private static string twitterAccessToken;
private static string twitterAccessTokenSecret;
private static string storageConnectionString;
private static string storageContainerName;
private static TimeSpan maxTimeSpan = new TimeSpan(1, 0, 0); // Max lifecycle time per launch (Check 'functionTimeout' in host.json)
//private static int maxCount = 672; // 500,000÷31÷24
private static int maxCount = 1000; // Max number of Tweets retrieved per launch (Default:1000)
private static int commitInterval = 100; // Max number of Tweets retrieved per loop (Default:100)
private static int counter;
private static DateTime startDt;
private static string blobFileName;
private static string filteredKeywords;
private static bool completed = false;
[FunctionName("StoreTweetData")]
public static void Run([TimerTrigger("0 0 */1 * * *")]TimerInfo myTimer, ILogger log)
{
log.LogInformation($"********** StoreTweetData Function started. **********");
// Environment Variables
twitterApiKey = Environment.GetEnvironmentVariable("TwitterApiKey");
twitterApiSecret = Environment.GetEnvironmentVariable("TwitterApiSecret");
twitterAccessToken = Environment.GetEnvironmentVariable("TwitterAccessToken");
twitterAccessTokenSecret = Environment.GetEnvironmentVariable("TwitterAccessTokenSecret");
storageConnectionString = Environment.GetEnvironmentVariable("StorageConnectionString");
storageContainerName = Environment.GetEnvironmentVariable("StorageContinerName");
string twMaxTimeSpanMins = Environment.GetEnvironmentVariable("TwMaxTimeSpanMins");
if (twMaxTimeSpanMins != null)
maxTimeSpan = new TimeSpan(0, int.Parse(twMaxTimeSpanMins), 0);
string twMaxCount = Environment.GetEnvironmentVariable("TwMaxCount");
if (twMaxCount != null)
maxCount = int.Parse(twMaxCount);
string twCommitInterval = Environment.GetEnvironmentVariable("TwCommitInterval");
if (twCommitInterval != null)
commitInterval = int.Parse(twCommitInterval);
filteredKeywords = Environment.GetEnvironmentVariable("TwKeywords"); // Check whether local.settings.json is UTF8 or not
if (filteredKeywords == null)
throw new ApplicationException("TwKeywords not set");
// Initialize values
startDt = DateTime.UtcNow;
completed = false;
counter = 0;
// Start Twitter Stream reading loop
while (true)
{
var dt = DateTime.UtcNow;
blobFileName = $"./tweetdata/{dt.ToString("yyyy")}/{dt.ToString("MM")}/{dt.ToString("dd")}/{dt.ToString("HH")}/tw_{Guid.NewGuid().ToString("D")}.parquet";
var tweets = new TweetsEntity();
tweets.CreatedAt = new List<DateTimeOffset>();
tweets.CreatedBy = new List<string>();
tweets.Source = new List<string>();
tweets.Text = new List<string>();
StartFilteredStream(tweets, commitInterval, log).Wait();
if (completed)
break;
Thread.Sleep(1000);
}
log.LogInformation($"********** StoreTweetData Function ended. **********");
}
private static async Task StartFilteredStream(TweetsEntity tweets, int commitInterval, ILogger log)
{
log.LogInformation($"***** Twitter Stream started : {DateTime.UtcNow}");
var prevCounter = counter;
Tweetinvi.Streaming.IFilteredStream stream = null;
TwitterClient client = null;
while (true)
{
try
{
// User client & stream (Get values from Twitter Developer Portal)
client = new TwitterClient(twitterApiKey, twitterApiSecret, twitterAccessToken, twitterAccessTokenSecret);
stream = client.Streams.CreateFilteredStream();
// Add filters
log.LogInformation($"***** Filtered Keywords : {filteredKeywords}");
var keywords = filteredKeywords.Split(new char[] { ' ', ' ' });
foreach (var keyword in keywords)
{
stream.AddTrack(keyword);
}
// Read stream
stream.MatchingTweetReceived += (sender, args) =>
{
var lang = args.Tweet.Language;
//***** Specify Japanese & Remove Bot
if (lang == Tweetinvi.Models.Language.Japanese && args.Tweet.Source.Contains(">Twitter "))
{
log.LogInformation("----------------------------------------------------------------------");
log.LogInformation($"** CreatedAt : {args.Tweet.CreatedAt}");
log.LogInformation($"** CreatedBy : {args.Tweet.CreatedBy}");
log.LogInformation($"** Source : {args.Tweet.Source}");
log.LogInformation($"** Text : {args.Tweet.Text}");
tweets.CreatedAt.Add(args.Tweet.CreatedAt);
tweets.CreatedBy.Add(args.Tweet.CreatedBy.ToString());
var source = args.Tweet.Source;
var position = source.IndexOf(">");
source = source.Substring(position + 1);
position = source.IndexOf("<");
source = source.Substring(0, position);
tweets.Source.Add(source);
tweets.Text.Add(args.Tweet.Text);
}
lock (countLock)
{
++counter;
}
if (counter >= maxCount || (DateTime.UtcNow - startDt) >= maxTimeSpan)
{
stream.Stop();
completed = true;
}
else if ((counter - prevCounter) >= commitInterval)
{
stream.Stop();
}
};
await stream.StartMatchingAllConditionsAsync();
log.LogInformation($"***** Twitter Stream stopped : {DateTime.UtcNow} (counter : {counter})");
break;
}
catch (Exception ex)
{
if (ex.Message.Contains("The response ended prematurely."))
{
stream?.Stop();
await Task.Delay(1000);
log.LogInformation($"***** Retry to start stream : {DateTime.UtcNow}");
}
else
throw;
}
}
// Write tweet data to blob storage
await CreateParquetFile(tweets);
log.LogInformation($"***** Tweet Data stored to Blob : {DateTime.UtcNow}");
}
private static async Task CreateParquetFile(TweetsEntity tweets)
{
////////////////////////////////////////////////////////////////////////////////////////
/// Write Parquet file
/// https://github.com/aloneguid/parquet-dotnet
/// https://docs.microsoft.com/ja-jp/azure/storage/blobs/storage-quickstart-blobs-dotnet
////////////////////////////////////////////////////////////////////////////////////////
// create data columns with schema metadata and the data
var createdAtColumn = new Parquet.Data.DataColumn(
new DataField<DateTimeOffset>("CreatedAt"),
tweets.CreatedAt.ToArray()
);
var createdByColumn = new Parquet.Data.DataColumn(
new DataField<string>("CreatedBy"),
tweets.CreatedBy.ToArray()
);
var sourceColumn = new Parquet.Data.DataColumn(
new DataField<string>("Source"),
tweets.Source.ToArray()
);
var textColumn = new Parquet.Data.DataColumn(
new DataField<string>("Text"),
tweets.Text.ToArray()
);
// create file schema
var schema = new Schema(createdAtColumn.Field, createdByColumn.Field, sourceColumn.Field, textColumn.Field);
// create file
Stream stream = new MemoryStream();
using (var parquetWriter = new ParquetWriter(schema, stream))
{
// create a new row group in the file
using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup())
{
groupWriter.WriteColumn(createdAtColumn);
groupWriter.WriteColumn(createdByColumn);
groupWriter.WriteColumn(sourceColumn);
groupWriter.WriteColumn(textColumn);
}
}
// Write to Blob storage
var blobServiceClient = new BlobServiceClient(storageConnectionString);
var containerClient = blobServiceClient.GetBlobContainerClient(storageContainerName);
// Get a reference to a blob
BlobClient blobClient = containerClient.GetBlobClient(blobFileName);
stream.Position = 0;
await blobClient.UploadAsync(stream);
stream.Close();
}
private class TweetsEntity
{
public List<DateTimeOffset> CreatedAt { set; get; }
public List<string> CreatedBy { set; get; }
public List<string> Source { set; get; }
public List<string> Text { set; get; }
}
}
}
##14. デバッグ実行
テストし易いように、TimerTrigger 属性と local.settings.json の以下のパラメーターを変更して、デバッグ実行します。3 分おきの起動となりますので、ご注意ください。
public static void Run([TimerTrigger("0 */3 * * * *")]TimerInfo myTimer, ILogger log)
"TwMaxCount": "20",
"TwCommitInterval": "3",
#Function App の Azure へのデプロイ
##手順 (1)
Visual Studio のソリューション エクスプローラーで、プロジェクトを右クリックして、「発行」を選択します。
##手順 (2)
ターゲットとして、「Azure」を選択し、「次へ」ボタンをクリックします。
##手順 (3)
特定のターゲットとして、「Azure Function App (Windows)」 or 「Azure Function App (Linux)」を選択します。どちらでも良いですが、今回は、Windows 版を選択し、「次へ」ボタンをクリックします。
##手順 (4)
Function App を事前に作成していれば、対象アプリを選択し、無ければ、「+」ボタンで新規に作成します。
ここでは、先述した「プラン」を選択することになります。ストレージについては、ADLS Gen2 ではなく、汎用v2 のストレージを選択、または、新規で作成することになります。「作成」ボタンをクリックすると、Function App インスタンスが作成されます。
##手順 (5)
作成された Function App インスタンスが選択された状態になっていますので、このまま「完了」ボタンをクリックします。
##手順 (6)
Function App の稼働状況やログをモニタリング可能な Application Insights (Azure Monitor の機能) を構成します。未構成だと、黄色のビックリマークが表示されていますので、「構成」リンクをクリックします。
割り当てる Application Insights が無ければ、「+」ボタンで新規作成を行います。
##手順 (7)
作成された Application Insights インスタンスが選択された状態になっていますので、このまま「次へ」ボタンをクリックします。
このまま「次へ」ボタンをクリックします。
「完了」ボタンをクリックします。
##手順 (8)
Application Insights の状態が、「構成済み」になったら、「発行」ボタンをクリックします。
以下のように「正常に公開されました」となれば、デプロイは成功です。
#Function App の構成 (パラメーター設定)
Azure ポータルから対象の Function App を表示し、「構成」メニューを選択します。以下の「アプリケーション設定」に local.settings.json で定義したパラメーターを追加していきます。
「+ 新しいアプリケーション設定」ボタンをクリックし、パラメーターと設定値を追加します。
最後に上部にある「保存」ボタンをクリックすると、Function App の再起動が入り、環境変数として反映されます。
#Function App のモニタリング
Azure ポータルから対象の Function App を表示し、「関数」メニューから対象の関数「StoreTweetData」リンクをクリックします。「モニター」メニューを表示すると、実行履歴が表示されます。各実行日時のリンクをクリックすると、その日時で出力されたログが表示されます。
関数の実行が「成功」し、ADLS Gen2 に Parquet ファイルが出力されていれば、Step5 の目標は達成したことになります。お疲れ様でした。
#次のステップへ
Step6 では、Azure Synapse Analytics - Serverless SQL pool を利用して、Parquet ファイルに対して SQL 文で分析をしてみます。
#参照
Azure Functions 概要
Azure Functions - host.json 設定パラメーター
Azure Storage Blob Nuget Package サイト
Apache Parquet for .Net Platform サイト
C# 向け Twitter API SDK (TweetinviAPI) Nuget Package サイト
TweetinviAPI - Filtered Stream API リファレンス
クロス プラットフォーム .NET 概要
Twitter 開発者向けサイト
Twitter API サイト
Visual Studio 2019 Community サイト
Visual Studio Code のサイト