0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

MQTT-BrokerとFluentdをminikubeにデプロイしてみた

Posted at

#目的と概要
大学のロボットアームからデータを収集しよう!
ということでロボットに設置したRaspberry PiからMQTTでデータを送信して、データレイクに保存するまでの試験的な実装を考えました。

###Kafkaにデータを送信しなければならない
サーバー側の環境を確認したところ、入ってくるデータはKubernetes上のKafkaが管理しているとのこと。よってMQTTブローカーとKafkaブローカーをつなぐコネクターが必要なので、オープンソースであるFluentdを利用してコネクターを作成することにしました。エッジデバイス内へのMQTTブローカー設置も考えましたが、なるべく負荷を減らしたいのでこれもKubernetesにデプロイします。デバイス => MQTT-Broker => Fluentd => Kafka の流れでデータが移動します。

###本稿の内容
ローカルでminikubeを利用して行なったことを次の順に説明していきます。

  1. MQTT-Brokerのデプロイメント
  2. Fluentdのプラグインの追加とDockerイメージの作成
  3. Fluentdのデプロイメント

#開発環境

#事前の準備
Kubernetesのローカル開発環境としてminikubeを使用しました。minikubeはDockerやVirtual Boxなどの仮想マシンマネージャー上で動作します。今回はDockerを使用しました。

  1. Dockerのインストール
  2. minikubeのインストール

を各ページの指示に従って行いました。動作確認をするためローカルにmosquitto clientのインストールもしておくと便利です。

#MQTT-Brokerのデプロイメント
ここから本題に入ります。
eclipse-mosquittoを利用してブローカーをminikubeにデプロイしました。こちらはdockerイメージをそのまま使用するので、yamlファイルの作成から始めます。まずはデフォルトのmosquitto.confファイルを編集するためにConfigMapを作りました。
###ConfigMapの作成
ConfigMapは設定情報を扱うためのリソースです。各アプリケーション用の設定をConfigMapとしてKubernetesに預けておけば、コンテナのデプロイ時にそれを使って設定ファイルの作成ができます。手元にmosquitto.confがある場合はkubectl create configmap <your-map-name> --from-file=<your-source-flie>のコマンドで作成できます。yamlファイルから作成する時は次のような記述をします。

apiVersion: v1
kind: ConfigMap
metadata:
  name: mosquitto-config-file
data:
  mosquitto.conf: |
    listener 1883
    allow_anonymous false
    password_file /mosquitto/secret/passwd

dataタグ下の記述はmosquitto.confという名前のファイルの中身が|(改行維持の表記)の後に続くテキストにマッピングされることを示しています。allow_anonymous falseにしているのでブローカーの接続にはユーザー名とパスワードが必須になってます。パスワードファイルの保存先はpassword_file /mosquitto/secret/passwdで設定をしています。

本来はSecretでパスワードファイルのマップを作成するべきですが、デプロイ時のエラーが解決できなかったのと、kafkaへのアクセスはsslを利用するのでひとまずconfigmapで作成しました。mosquittoのパスワードの暗号化はこちらのサイトを参考にしました。

###Deploymentの作成
次のような記述をyamlファイルに追加します。(configmapと同じファイルに記述するときはセパレーター---を忘れずに!)

apiVersion: apps/v1 
kind: Deployment
metadata:
  name: mqtt-broker
spec:
  selector:
    matchLabels:
      app: mqtt-broker
  replicas: 1 # レプリカの数の指定
  template:
    metadata:
      labels:
        app: mqtt-broker
    spec:
      containers:
        - name: mosquitto
          image: eclipse-mosquitto #レジストリのホスト名を指定しない場合は、KubernetesはDockerパブリックレジストリを意味していると見なします。
          ports:
            - containerPort: 1883
          volumeMounts: # マウント先の指定
            - name: mosquitto-config
              mountPath: /mosquitto/config
            - name: mosquitto-passwd
              mountPath: /mosquitto/secret
              readOnly: true
      volumes: # マップのソースを指定
        - name: mosquitto-config
          configMap:
            name: mosquitto-config-file  #ConfigMapの名前を指定
        - name: mosquitto-passwd
          configMap:
            name: mqtt-passwd

Docker imageの取得レジストリ、設定のマッピング先などを指定してあげます。これでデプロイすれば新しいPodが作成されて、その中でコンテナが動作するようになりました。

###Serviceの作成
次にデバイスやFluentdがこのPodと通信できるようにサービスの記述もyamlに追加します。kubectl exposeコマンドでも可能ですが、次回のデプロイ時にサービス名が違うと困るのでyamlに書きました。

apiVersion: v1
kind: Service
metadata:
  name: mosquitto-service
  labels:
    name: mosquitto
spec:
  selector:
    app: mqtt-broker
  ports:
  - port: 1883
    name: mosquitto-port
    protocol: TCP

デプロイメント

これでyamlの記述は完了です。全体はこんな感じです。

# mosquitto.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: mosquitto-config-file
data:
  mosquitto.conf: |
    listener 1883
    allow_anonymous false
    password_file /mosquitto/secret/passwd
#---
## この部分はパスワードファイル用のマップです。記述せずにコマンドでsecretを作成するか、別ファイルからapplyすることをおすすめします
#apiVersion: v1
#kind: ConfigMap
#metadata:
#  name: mqtt-passwd
#data:
#  passwd: |
#    <your-mosquitto-username>:<generated-password>   
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mqtt-broker
spec:
  selector:
    matchLabels:
      app: mqtt-broker
  replicas: 1 
  template:
    metadata:
      labels:
        app: mqtt-broker
    spec:
      containers:
        - name: mosquitto
          image: eclipse-mosquitto
          ports:
            - containerPort: 1883
          volumeMounts:
            - name: mosquitto-config
              mountPath: /mosquitto/config
            - name: mosquitto-passwd
              mountPath: /mosquitto/secret
              readOnly: true
      volumes:
        - name: mosquitto-config
          configMap:
            name: mosquitto-config-file
        - name: mosquitto-passwd
          configMap:
            name: mqtt-passwd
---
apiVersion: v1
kind: Service
metadata:
  name: mosquitto-service
  labels:
    name: mosquitto
spec:
  selector:
    app: mqtt-broker
  ports:
  - port: 1883
    name: mosquitto-port
    protocol: TCP

Dockerを起動してminikube startでクラスターを立ち上げたら、このyamlファイルをkubectl apply -f ファイルでデプロイします。minikube dashboardでダッシュボードを起動してデプロイが成功したか確認できます。注意点は、作成したサービスが外部からのアクセスポートを持っていないことです。投稿主のローカル環境ではNodePortからのアクセスはできなかったので、ポートフォワードをしました。kubectl port-forward svc/mosquitto-service 1883:1883のコマンドでローカルポート:リモートポートの間にtunnelを通します。これでlocalhost:1883からブローカーにアクセスできるようになりました。

実際の環境ではLoadBalancerまたはNodePortを使って外部IPを設けることになると思います。
#Fluentdのデプロイメント
続いてはFluentdの登場です。Fluentd向けに多数のプラグインが開発されていて、インストールもシンプルです。

(余談ですがRaspberry PiでFluentdを導入した際にbundleGemfileの代わりにfluentd-gem installを使用したためエラーが出てしまったので、参考になればと思います。またmacにインストールしたtd-agentにプラグインを追加する場合は、通常のfluentd-gemではインストールできませんでした。プラグインのインストールには/opt/td-agent/embedded/bin/fluent-gem install <plugin-name>を使用するようです。)

###FluentdにMQTTプラグインを追加する
fluentd-kubernetes-daemonsetのdebian-kafka2-fluentをダウンロードして編集しました。FluentdのDockerイメージの作成手順は以下の通りです。

  1. Gemfilefluent-plugin-mqtt-ioを追加します。
  2. 次にdocker build -t <name>:<tag> .でイメージを作成して、docker image push <image>でDocker Hubにプッシュします。

次はfluent.confを編集しました。<source>タグ内に先ほど作成したmosquitto-serviceからデータを読み込むための設定を書きます。追加部分の例はこちらです。

<source>
  @type mqtt
  host "#{ENV['FLUENT_MQTT_BROKER_HOST'] || 'mosquitto-service'}"
  port "#{ENV['FLUENT_MQTT_BROKER_PORT'] || '1883'}"
  topic "#{ENV['FLUENT_MQTT_SUB_TOPIC'] || 'topic'}"
  <security>
     username "#{ENV['FLUENT_MQTT_USER'] || 'user'}"
     password "#{ENV['FLUENT_MQTT_PASSWORD'] || 'password'}"
  </security>
  <parse>
    @type "#{ENV['FLUENT_MQTT_INPUT_FORMAT_TYPE'] || 'json'}"
  </parse>
</source>

アドレス情報の他に、購読するトピックや入力タイプの指定ができます。その他設定についてはFluent::Plugin::Mqtt::IOを参照してください。

###Kafkaプラグインの設定
今回利用しているリポジトリのDockerfileはKafkaプラグインのインストールが設定済みとなっています。(Docker環境のみで利用する場合は、FluentdのDocker Hubにカスタマイズの方法が記載されています。)
また、confファイルに環境変数として定義されているものはyamlファイルから設定ができます。ここではさらにKafkaとの通信のためにTLS/SSLの設定を追加しました。

クライアント認証ができるように、ルート証明書、クライアント証明書、クライアントキーを用意しました。今回はkubectl create secret generic <secret-name> --from-file=ca_cert.pem --from-file=client_cert.pem --from-file=client_key.pemでSecretを作成しました。

次はfluent.conf@type kafka2を含む<match>タグのエリアに

ssl_ca_cert /fluentd/ssl/ca_cert.pem
ssl_client_cert /fluentd/ssl/client_cert.pem
ssl_client_cert_key /fluentd/ssl/client_cert_key.pem

の行を追加してクライアント認証を有効にします。(Kafka側には認証できるようにクライアント証明書のルート証明書を入れておきます)

fluentd-kubernetes-daemonsetのページのファイルを参考にFluentd-Kafka用のyamlファイルを作成します。secretのvolumes設定にsecretNameではなくnameを使ってコンソールに怒られました。

volumes:
- name: fluentd-config
        configMap:
            name: fluent-conf
- name: fluentd-kafka-tls
        secret:
            secretName: tls-secret

これで設定は完了です。あとはkubectl apply -fでデプロイして動作確認をしました。

###動作の確認
ローカルのmosquittoからpublishしました。MosquittoからFluentdへのデータ送信については、fluentd.conf内のKafka用の<match>タグ内を@stdoutのみに書き換えてデプロイした後、ターミナルかダッシュボードでログを確認しました。Kafka側の出力は既にGrafanaがあったので、ない場合はKafka-Consumer等でログが確認できると思います。

今回の実装で行なったことは以上です。

近いうちにKafka => telegraf/Fluentd => influxDBのルートの実装も予定してるので、そちらに関しても投稿できればと思っています。

#その他参考にしたもの
Kubernetesドキュメント
Kubernetes Fluentd
Kubernetes ConfigMap and Secret as Kubernetes Volumes | Demo (youtube)

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?