#目的と概要
大学のロボットアームからデータを収集しよう!
ということでロボットに設置したRaspberry PiからMQTTでデータを送信して、データレイクに保存するまでの試験的な実装を考えました。
###Kafkaにデータを送信しなければならない
サーバー側の環境を確認したところ、入ってくるデータはKubernetes上のKafkaが管理しているとのこと。よってMQTTブローカーとKafkaブローカーをつなぐコネクターが必要なので、オープンソースであるFluentdを利用してコネクターを作成することにしました。エッジデバイス内へのMQTTブローカー設置も考えましたが、なるべく負荷を減らしたいのでこれもKubernetesにデプロイします。デバイス => MQTT-Broker => Fluentd => Kafka の流れでデータが移動します。
###本稿の内容
ローカルでminikubeを利用して行なったことを次の順に説明していきます。
- MQTT-Brokerのデプロイメント
- Fluentdのプラグインの追加とDockerイメージの作成
- Fluentdのデプロイメント
#開発環境
- macOS Big Sur, version 11.2.1
- Docker, version 20.10.5
- minikube, v1.18.1
- eclipse-mosquitto, v 2.0.9
- fluentd-kubernetes-daemonset, v1.12.0-debian-kafka2-1.2
- Fluent::Plugin::Mqtt::IO
- fluent-plugin-kafka
#事前の準備
Kubernetesのローカル開発環境としてminikubeを使用しました。minikubeはDockerやVirtual Boxなどの仮想マシンマネージャー上で動作します。今回はDockerを使用しました。
を各ページの指示に従って行いました。動作確認をするためローカルにmosquitto clientのインストールもしておくと便利です。
#MQTT-Brokerのデプロイメント
ここから本題に入ります。
eclipse-mosquittoを利用してブローカーをminikubeにデプロイしました。こちらはdockerイメージをそのまま使用するので、yamlファイルの作成から始めます。まずはデフォルトのmosquitto.conf
ファイルを編集するためにConfigMapを作りました。
###ConfigMapの作成
ConfigMapは設定情報を扱うためのリソースです。各アプリケーション用の設定をConfigMapとしてKubernetesに預けておけば、コンテナのデプロイ時にそれを使って設定ファイルの作成ができます。手元にmosquitto.confがある場合はkubectl create configmap <your-map-name> --from-file=<your-source-flie>
のコマンドで作成できます。yamlファイルから作成する時は次のような記述をします。
apiVersion: v1
kind: ConfigMap
metadata:
name: mosquitto-config-file
data:
mosquitto.conf: |
listener 1883
allow_anonymous false
password_file /mosquitto/secret/passwd
dataタグ下の記述はmosquitto.confという名前のファイルの中身が|
(改行維持の表記)の後に続くテキストにマッピングされることを示しています。allow_anonymous false
にしているのでブローカーの接続にはユーザー名とパスワードが必須になってます。パスワードファイルの保存先はpassword_file /mosquitto/secret/passwd
で設定をしています。
本来はSecretでパスワードファイルのマップを作成するべきですが、デプロイ時のエラーが解決できなかったのと、kafkaへのアクセスはsslを利用するのでひとまずconfigmapで作成しました。mosquittoのパスワードの暗号化はこちらのサイトを参考にしました。
###Deploymentの作成
次のような記述をyamlファイルに追加します。(configmapと同じファイルに記述するときはセパレーター---
を忘れずに!)
apiVersion: apps/v1
kind: Deployment
metadata:
name: mqtt-broker
spec:
selector:
matchLabels:
app: mqtt-broker
replicas: 1 # レプリカの数の指定
template:
metadata:
labels:
app: mqtt-broker
spec:
containers:
- name: mosquitto
image: eclipse-mosquitto #レジストリのホスト名を指定しない場合は、KubernetesはDockerパブリックレジストリを意味していると見なします。
ports:
- containerPort: 1883
volumeMounts: # マウント先の指定
- name: mosquitto-config
mountPath: /mosquitto/config
- name: mosquitto-passwd
mountPath: /mosquitto/secret
readOnly: true
volumes: # マップのソースを指定
- name: mosquitto-config
configMap:
name: mosquitto-config-file #ConfigMapの名前を指定
- name: mosquitto-passwd
configMap:
name: mqtt-passwd
Docker imageの取得レジストリ、設定のマッピング先などを指定してあげます。これでデプロイすれば新しいPodが作成されて、その中でコンテナが動作するようになりました。
###Serviceの作成
次にデバイスやFluentdがこのPodと通信できるようにサービスの記述もyamlに追加します。kubectl expose
コマンドでも可能ですが、次回のデプロイ時にサービス名が違うと困るのでyamlに書きました。
apiVersion: v1
kind: Service
metadata:
name: mosquitto-service
labels:
name: mosquitto
spec:
selector:
app: mqtt-broker
ports:
- port: 1883
name: mosquitto-port
protocol: TCP
デプロイメント
これでyamlの記述は完了です。全体はこんな感じです。
# mosquitto.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: mosquitto-config-file
data:
mosquitto.conf: |
listener 1883
allow_anonymous false
password_file /mosquitto/secret/passwd
#---
## この部分はパスワードファイル用のマップです。記述せずにコマンドでsecretを作成するか、別ファイルからapplyすることをおすすめします
#apiVersion: v1
#kind: ConfigMap
#metadata:
# name: mqtt-passwd
#data:
# passwd: |
# <your-mosquitto-username>:<generated-password>
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mqtt-broker
spec:
selector:
matchLabels:
app: mqtt-broker
replicas: 1
template:
metadata:
labels:
app: mqtt-broker
spec:
containers:
- name: mosquitto
image: eclipse-mosquitto
ports:
- containerPort: 1883
volumeMounts:
- name: mosquitto-config
mountPath: /mosquitto/config
- name: mosquitto-passwd
mountPath: /mosquitto/secret
readOnly: true
volumes:
- name: mosquitto-config
configMap:
name: mosquitto-config-file
- name: mosquitto-passwd
configMap:
name: mqtt-passwd
---
apiVersion: v1
kind: Service
metadata:
name: mosquitto-service
labels:
name: mosquitto
spec:
selector:
app: mqtt-broker
ports:
- port: 1883
name: mosquitto-port
protocol: TCP
Dockerを起動してminikube start
でクラスターを立ち上げたら、このyamlファイルをkubectl apply -f ファイル
でデプロイします。minikube dashboard
でダッシュボードを起動してデプロイが成功したか確認できます。注意点は、作成したサービスが外部からのアクセスポートを持っていないことです。投稿主のローカル環境ではNodePort
からのアクセスはできなかったので、ポートフォワードをしました。kubectl port-forward svc/mosquitto-service 1883:1883
のコマンドでローカルポート:リモートポート
の間にtunnelを通します。これでlocalhost:1883
からブローカーにアクセスできるようになりました。
実際の環境ではLoadBalancer
またはNodePort
を使って外部IPを設けることになると思います。
#Fluentdのデプロイメント
続いてはFluentdの登場です。Fluentd向けに多数のプラグインが開発されていて、インストールもシンプルです。
(余談ですがRaspberry PiでFluentdを導入した際にbundle
とGemfile
の代わりにfluentd-gem install
を使用したためエラーが出てしまったので、参考になればと思います。またmacにインストールしたtd-agentにプラグインを追加する場合は、通常のfluentd-gem
ではインストールできませんでした。プラグインのインストールには/opt/td-agent/embedded/bin/fluent-gem install <plugin-name>
を使用するようです。)
###FluentdにMQTTプラグインを追加する
fluentd-kubernetes-daemonsetのdebian-kafka2-fluentをダウンロードして編集しました。FluentdのDockerイメージの作成手順は以下の通りです。
-
Gemfile
にfluent-plugin-mqtt-io
を追加します。 - 次に
docker build -t <name>:<tag> .
でイメージを作成して、docker image push <image>
でDocker Hubにプッシュします。
次はfluent.conf
を編集しました。<source>
タグ内に先ほど作成したmosquitto-service
からデータを読み込むための設定を書きます。追加部分の例はこちらです。
<source>
@type mqtt
host "#{ENV['FLUENT_MQTT_BROKER_HOST'] || 'mosquitto-service'}"
port "#{ENV['FLUENT_MQTT_BROKER_PORT'] || '1883'}"
topic "#{ENV['FLUENT_MQTT_SUB_TOPIC'] || 'topic'}"
<security>
username "#{ENV['FLUENT_MQTT_USER'] || 'user'}"
password "#{ENV['FLUENT_MQTT_PASSWORD'] || 'password'}"
</security>
<parse>
@type "#{ENV['FLUENT_MQTT_INPUT_FORMAT_TYPE'] || 'json'}"
</parse>
</source>
アドレス情報の他に、購読するトピックや入力タイプの指定ができます。その他設定についてはFluent::Plugin::Mqtt::IOを参照してください。
###Kafkaプラグインの設定
今回利用しているリポジトリのDockerfileはKafkaプラグインのインストールが設定済みとなっています。(Docker環境のみで利用する場合は、FluentdのDocker Hubにカスタマイズの方法が記載されています。)
また、confファイルに環境変数として定義されているものはyamlファイルから設定ができます。ここではさらにKafkaとの通信のためにTLS/SSLの設定を追加しました。
クライアント認証ができるように、ルート証明書、クライアント証明書、クライアントキーを用意しました。今回はkubectl create secret generic <secret-name> --from-file=ca_cert.pem --from-file=client_cert.pem --from-file=client_key.pem
でSecretを作成しました。
次はfluent.conf
の@type kafka2
を含む<match>
タグのエリアに
ssl_ca_cert /fluentd/ssl/ca_cert.pem
ssl_client_cert /fluentd/ssl/client_cert.pem
ssl_client_cert_key /fluentd/ssl/client_cert_key.pem
の行を追加してクライアント認証を有効にします。(Kafka側には認証できるようにクライアント証明書のルート証明書を入れておきます)
fluentd-kubernetes-daemonsetのページのファイルを参考にFluentd-Kafka用のyamlファイルを作成します。secretのvolumes設定にsecretNameではなくnameを使ってコンソールに怒られました。
volumes:
- name: fluentd-config
configMap:
name: fluent-conf
- name: fluentd-kafka-tls
secret:
secretName: tls-secret
これで設定は完了です。あとはkubectl apply -f
でデプロイして動作確認をしました。
###動作の確認
ローカルのmosquittoからpublishしました。MosquittoからFluentdへのデータ送信については、fluentd.conf
内のKafka用の<match>
タグ内を@stdout
のみに書き換えてデプロイした後、ターミナルかダッシュボードでログを確認しました。Kafka側の出力は既にGrafanaがあったので、ない場合はKafka-Consumer等でログが確認できると思います。
今回の実装で行なったことは以上です。
近いうちにKafka => telegraf/Fluentd => influxDBのルートの実装も予定してるので、そちらに関しても投稿できればと思っています。
#その他参考にしたもの
Kubernetesドキュメント
Kubernetes Fluentd
Kubernetes ConfigMap and Secret as Kubernetes Volumes | Demo (youtube)