#はじめに
- AWS LambdaでRESTでデータ取得してSplunkに送信してみるではBlueprintを使用してLambdaからSplunkへのHEC送信に成功しました。
- しかしBlueprintはNode.jsでした。Splunk管理者はPythonの方が得意だと思いますので(偏見)、Pythonでできないか調べました。
- PythonでSplunk HECに送信するClass(Splunk-Class-httpevent)がありましたので、今回はこちらを使って実現します。
- Splunk-Class-httpeventはレイヤーにしてLambda上のPythonから呼び出せるようにします。
- Splunkで毎日運勢を占えるようになりました。
#環境
- Lambdaからアクセス可能するためEC2などでグローバルIPを持つSplunk Enterprise、もしくはSplunk Cloud
- AWS Lambda
#設定手順の概要
- SplunkでHEC設定
- AWS Lambdaのレイヤー作成(モジュールアップロード)
- AWS Lambdaで関数作成
- CloudWatchでLambdaを定期実行化
#設定手順
##1. SplunkでHEC設定
まずはいつも通り、AWS Lambdaからデータを受け取るためのHECを設定します。
- 設定 > データ入力
- HTTPイベントコレクターの [+ 新規追加] をクリック
- 名前を適当に入力して [次へ] をクリック
- 取り込み先のインデックスを選択 ※今回はaws_lambdaを作成しました。
- 生成されたトークンをメモ
以上です。
念のため、SSLの有効/無効状況やHTTPポート番号をグローバル設定から確認しておきましょう。
##2. AWS Lambdaのレイヤー作成(モジュールアップロード)
PythonからSplunk HECでデータ送信する神ClassをSplunk Trustの方が作っていらっしゃいました。
Splunk-Class-httpevent
今回はこちらをありがたく使わせていただきます。
とは言ってもLambdaではpipでインストールはできません。
その代わりにLambdaのレイヤー機能を使用します。
モジュールなどをzipで固めてレイヤーにアップロードすることで、Lambda関数から呼び出すことができます。
AWS Lambda レイヤー
2-1. zipの準備
ローカルで以下のようにモジュールをpythonディレクトリにダウンロードしzip化します。
※ディレクトリ名は「python」である必要があります。レイヤーとしてアップロードしたときに/opt/配下に展開されるためです。zip名は任意です。
mkdir ./python
pip install git+git://github.com/georgestarcher/Splunk-Class-httpevent.git -d ./python
zip -r python-hec.zip ./python
2-2. レイヤーへのアップロード
Lambdaページのレイヤーメニューから [レイヤーの作成] をクリックします。
適当に名前を付け、作成したzipファイルをアップロードし、互換性のあるランタイムにPythonを選択し、[作成] をクリックします。
これでレイヤー作成は完了です。
次は関数を作成します。
##3. AWS Lambdaで関数作成
3-1. 関数作成
- Lambdaの関数メニューから [関数を作成] をクリック
- [一から作成] を選択し諸々入力
- 関数名:任意
- ランタイム:Python (本記事の作成時点で最新版の3.8)
- 実行ロール:「基本的なLambdaアクセス権限で新しいロールを作成」のまま
- [関数の作成] をクリック
3-2. レイヤーの追加
- 関数ページの下の方にあるレイヤー枠の [レイヤーの追加] をクリック
- 「カスタムレイヤー」を選択し、カスタムレイヤーから先ほどアップロードしたレイヤーとバージョンを選択。
- [追加] をクリックして完了。
これで関数から通常のimportで呼び出せるようになりました。
3-3. Pythonスクリプト作成
それではPythonスクリプトを作っていきましょう。
Splunk-Class-httpeventの使い方としてサンプルが提供されています。
特に重要な箇所は以下です。
# HEC URLおよびトークン
http_event_collector_key = "4D14F8D9-D788-4E6E-BF2D-D1A46441242E"
http_event_collector_host = "localhost"
# Splunkメタデータ
payload = {}
payload.update({"index":"test"})
payload.update({"sourcetype":"crime"})
payload.update({"source":"witness"})
payload.update({"host":"mansion"})
# データ送信(単発)
for i in range(5):
event = commitCrime()
event.update({"action":"success"})
event.update({"crime_type":"single"})
event.update({"crime_number":i})
payload.update({"event":event})
testevent.sendEvent(payload)
# データ送信(バッチ)
for i in range(50000):
event = commitCrime()
event.update({"action":"success"})
event.update({"crime_type":"batch"})
event.update({"crime_number":i})
payload.update({"event":event})
testevent.batchEvent(payload)
testevent.flushBatch()
それでは、今回も適当なREST APIでデータを取ってみましょう。
占いAPIでSplunkで毎日お手軽に占えるようにしたいと思います。
Web ad Fortune 無料版API
※利用規約あり。商用利用の場合はビジネス版あり。
このような感じで年月日を渡せば良いようです。
http://api.jugemkey.jp/api/horoscope/free/year/month/day
これを使ってデータ取得と送信のスクリプトを作ってみます。
できたものがこちらです。
from splunk_http_event_collector import http_event_collector
import requests
import os
import json
import logging
import sys
import datetime
def lambda_handler(event, context):
# init logging config, this would be job of your main code using this class.
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S %z')
# Create event collector object, default SSL and HTTP Event Collector Port
# HEC URL、トークンは環境変数化。[設定]タブ > [環境変数]から環境変数を作成できる。
http_event_collector_key = os.environ["SPLUNK_HEC_TOKEN"]
http_event_collector_host = os.environ["SPLUNK_HEC_URL"]
testevent = http_event_collector(http_event_collector_key, http_event_collector_host)
# perform a HEC reachable check
hec_reachable = testevent.check_connectivity()
if not hec_reachable:
sys.exit(1)
# Set to pop null fields. Always a good idea
testevent.popNullFields = True
# set logging to DEBUG for example
testevent.log.setLevel(logging.DEBUG)
# Start event payload and add the metadata information
payload = {}
payload.update({"index":"aws_lambda"})
payload.update({"sourcetype":"horoscope"})
payload.update({"source":context.function_name})
payload.update({"host":"lambda"})
# 本日の占い取得。LambdaはUTCで動作するためJSTに変換する。
dt = datetime.datetime.utcnow() + datetime.timedelta(hours=9)
url = "http://api.jugemkey.jp/api/horoscope/free/{}/{}/{}".format(dt.year, dt.month, dt.day)
event = requests.get(url).json()
# Splunk HECへ送信
payload.update({"event":event})
testevent.sendEvent(payload)
さて、これを実行してみます。[Test] ボタンからテストできます。 [Deploy] で更新内容を反映してからテストしてみましょう。
Splunkで見てみましょう。無事取り込めました。すごい!
いい感じに動いたので、次はこのスクリプトを定期実行したいと思います。
##3. CloudWatchでLambdaを定期実行化
前回記事と同様に、CloudWatchで定期実行化します。
早速設定してみましょう。
- 設定 > トリガーから [トリガーを追加] をクリック
- EventBridge (CloudWatch Events) を選択
- ルールタイプで「スケジュール式」を選択。cron(0 15 ? * * *)で毎日日本時間の0時0分に取得するようにします。
- [追加] をクリックすれば定期実行化が完了
これで毎日、その日の運勢を取得してSplunkに送ってくれるようになりました。
##4. 今日の運勢をダッシュボード化
せっかくなのでダッシュボードで本日の運勢を見れるようにしてみましょう。
SPLは以下です。
index=aws_lambda sourcetype=horoscope earliest=@d
| table horoscope*
| foreach horoscope.*.* [rename <<FIELD>> as <<MATCHSEG2>>]
| mvexpand color
| streamstats count
| eval content=mvindex(content,count - 1)
| eval item=mvindex(item,count - 1)
| eval job=mvindex(job,count - 1)
| eval love=mvindex(love,count - 1)
| eval money=mvindex(money,count - 1)
| eval rank=mvindex(rank,count - 1)
| eval sign=mvindex(sign,count - 1)
| eval total=mvindex(total,count - 1)
| fields - count day
| table rank sign content item color job love money total
| sort rank
| rename rank as "ランキング", sign as "星座", content as "占い", "item" as "ラッキーアイテム", "color" as "ラッキーカラー", "job" as "仕事運", love as "恋愛運", money as "金運", total as "総合運"
ダッシュボードにします。
なんならトップページに載せてしまいましょう。
#まとめ
- AWS LambdaでPythonを使用し、RESTでデータ取得、SplunkにHEC送信できるようになりました。
- レイヤーを使えば任意のモジュールをアップロードし呼び出せますのでお好みのモジュールで色々なことができそうです。