LoginSignup
0

More than 1 year has passed since last update.

k3sクラスタにMQTTサーバ(EMQX)をhelmで導入

Posted at

AWS上にk3sクラスタを構築したので、そこにMQTTサーバを導入したいと思います。MQTTサーバにはEMQXを使いたいと思います。ただ、一からStatefulSetを作るのには挫折してしまいました。helmを利用してインストールしたいと思います。

1.ソフトウェア構成

※AWS上の4インスタンスに構築したk3sクラスタ構成
Debian GNU/Linux 11
k3s version v1.23.8+k3s1 (53f2d4e7)
go version go1.17.5
Longhorn v1.3.0
Node-RED 2.2.2(latest)
EMQX v5.0.3(docker image emqx/emqx:latest)

2.インストール手順

helmレポジトリ追加

helmレポジトリを追加し確認します。

$ helm repo add emqx https://repos.emqx.io/charts
$ helm repo update
$ helm search repo emqx
NAME           CHART VERSION  APP VERSION DESCRIPTION
emqx/emqx      v4.0.0         v4.0.0      A Helm chart for EMQX
emqx/emqx-ee v4.0.0           v4.0.0      A Helm chart for EMQX
emqx/kuiper    0.1.1          0.1.1       A lightweight IoT edge analytic software

valuesファイルの編集

インストール時にカスタマイズしたいのでvaluesファイルを取得します。

$ helm inspect values emqx/emqx > emqx_values.yaml

変更前

変更前のvaluesファイルを以下に示します。

emqx_values.yaml(変更前)
affinity: {}
containerSecurityContext:
  enabled: true
  runAsNonRoot: true
  runAsUser: 1000
emqxConfig:
  EMQX_CLUSTER__DISCOVERY_STRATEGY: k8s
  EMQX_CLUSTER__K8S__ADDRESS_TYPE: hostname
  EMQX_CLUSTER__K8S__APISERVER: https://kubernetes.default.svc:443
  EMQX_CLUSTER__K8S__NAMESPACE: '{{ .Release.Namespace }}'
  EMQX_CLUSTER__K8S__SERVICE_NAME: '{{ .Release.Name }}-headless'
  EMQX_CLUSTER__K8S__SUFFIX: svc.cluster.local
  EMQX_DASHBOARD__DEFAULT_PASSWORD: public
  EMQX_DASHBOARD__DEFAULT_USERNAME: admin
emqxLicenseSecretName: null
image:
  pullPolicy: IfNotPresent
  repository: emqx/emqx
ingress:
  dashboard:
    annotations: {}
    enabled: false
    hosts:
    - dashboard.emqx.local
    path: /
    pathType: ImplementationSpecific
    tls: []
initContainers: {}
metrics:
  enabled: false
  type: prometheus
nodeSelector: {}
persistence:
  accessMode: ReadWriteOnce
  enabled: false
  size: 20Mi
  storageClassName: ""
podAnnotations: {}
podManagementPolicy: Parallel
podSecurityContext:
  enabled: true
  fsGroup: 1000
  fsGroupChangePolicy: Always
  runAsUser: 1000
  supplementalGroups:
  - 1000
recreatePods: false
replicaCount: 3
resources: {}
service:
  annotations: {}
  dashboard: 18083
  externalIPs: []
  loadBalancerSourceRanges: []
  mgmt: 8081
  mqtt: 1883
  mqttssl: 8883
  nodePorts:
    dashboard: null
    dashboardtls: null
    mgmt: null
    mqtt: null
    mqttssl: null
    ws: null
    wss: null
  type: ClusterIP
  ws: 8083
  wss: 8084
tolerations: []

変更点

valuesファイルを変更していきます。まず、MQTTサーバを動かすノードを指定したかったので、nodeSelectorを使用します。

nodeSelector:
  longhorn-node: 'true'

またせっかくlonghornを導入しているので、longhornのボリュームを利用するよう以下の指定を行いました。どれだけ意味のある事かは分かりませんが(笑)

persistence:
  accessMode: ReadWriteOnce
  enabled: true
  size: 20Mi
  storageClassName: longhorn

変更後

変更後のvaluesファイルを以下に示します。

emqx_values.yaml(変更後)
affinity: {}
containerSecurityContext:
  enabled: true
  runAsNonRoot: true
  runAsUser: 1000
emqxConfig:
  EMQX_CLUSTER__DISCOVERY_STRATEGY: k8s
  EMQX_CLUSTER__K8S__ADDRESS_TYPE: hostname
  EMQX_CLUSTER__K8S__APISERVER: https://kubernetes.default.svc:443
  EMQX_CLUSTER__K8S__NAMESPACE: '{{ .Release.Namespace }}'
  EMQX_CLUSTER__K8S__SERVICE_NAME: '{{ .Release.Name }}-headless'
  EMQX_CLUSTER__K8S__SUFFIX: svc.cluster.local
  EMQX_DASHBOARD__DEFAULT_PASSWORD: public
  EMQX_DASHBOARD__DEFAULT_USERNAME: admin
emqxLicenseSecretName: null
image:
  pullPolicy: IfNotPresent
  repository: emqx/emqx
ingress:
  dashboard:
    annotations: {}
    enabled: false
    hosts:
    - dashboard.emqx.local
    path: /
    pathType: ImplementationSpecific
    tls: []
initContainers: {}
metrics:
  enabled: false
  type: prometheus
nodeSelector:
  longhorn-node: 'true'
persistence:
  accessMode: ReadWriteOnce
  enabled: true
  size: 20Mi
  storageClassName: longhorn
podAnnotations: {}
podManagementPolicy: Parallel
podSecurityContext:
  enabled: true
  fsGroup: 1000
  fsGroupChangePolicy: Always
  runAsUser: 1000
  supplementalGroups:
  - 1000
recreatePods: false
replicaCount: 3
resources: {}
service:
  annotations: {}
  dashboard: 18083
  externalIPs: []
  loadBalancerSourceRanges: []
  mgmt: 8081
  mqtt: 1883
  mqttssl: 8883
  nodePorts:
    dashboard: null
    dashboardtls: null
    mgmt: null
    mqtt: null
    mqttssl: null
    ws: null
    wss: null
  type: ClusterIP
  ws: 8083
  wss: 8084
tolerations: []

emqxインストールと動作確認

変更したemqx_values.yamlを-fオプションにとって、emqxをインストールします。

$helm install -f .\emqx_values.yaml  emqx-cluster emqx/emqx

インストール後、動作確認をしてみます。Cluster statusに三つのノードが登録されていれば、クラスタが成立できています。

$ kubectl get pods
NAME                       READY   STATUS    RESTARTS   AGE
emqx-cluster-0             1/1     Running   0          51s
emqx-cluster-1             1/1     Running   0          52s
emqx-cluster-2             1/1     Running   0          55s

$ kubectl exec -it emqx-cluster-0 -- emqx_ctl status
Node 'emqx@emqx-cluster-0.emqx-cluster-headless.default.svc.cluster.local' 5.0.3 is started

$ kubectl exec -it emqx-cluster-0 -- emqx_ctl cluster status
Cluster status: #{running_nodes =>
                      ['emqx@emqx-cluster-0.emqx-cluster-headless.default.svc.cluster.local',
                       'emqx@emqx-cluster-1.emqx-cluster-headless.default.svc.cluster.local',
                       'emqx@emqx-cluster-2.emqx-cluster-headless.default.svc.cluster.local'],
                  stopped_nodes => []}

emqxのクラスタはどうアドレス解決されるのかも見てみましょう。

kubectl get svc
NAME                    TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)
   AGE
emqx-cluster            ClusterIP   10.43.135.251   <none>        1883/TCP,8883/TCP,8083/TCP,8084/TCP,18083/TCP
   23h
emqx-cluster-headless   ClusterIP   None            <none>        1883/TCP,8883/TCP,8083/TCP,8084/TCP,18083/TCP,4370/TCP   23h
kubernetes              ClusterIP   10.43.0.1       <none>        443/TCP
   19d
nodered                 NodePort    10.43.158.49    <none>        1880:31880/TCP
   23h

emqx-clusterと指定すればよさそうです。

3.Node-redとEMQXを連携させる

Node-redですこぶる単純なフローを作って、すこぶる簡単な動作確認をしてみましょう。

フローを作る

タイムスタンプを/testのトピックにpublishし、それを購読して形式変換してdebugノードにつなぐだけの、簡単なフローを作ります。
フロー.png
MQTTノードでMQTTサーバの設定を行います。先ほど確認したように、サーバの項にemqx-clusterと指定することで、MQTTサーバにつなぐことができます。
サーバ設定.png
MQTTサーバへのpub/sebノードはそれぞれ以下の通りに設定しました。
pub.png
sub.png

タイムスタンプノードのボタンを押すとメッセージが発行されます。一回押すごとにdebugウィンドウに二つデバッグメッセージが表示されたら成功です。

フローのデータ

フローを書き出した結果を以下に示します。これを取り込めばフローを再現できますが、この程度のフローならフローエディタ上で一から作ってもたいした事ないかもしれませんね。

[
    {
        "id": "18a5817d19127e87",
        "type": "tab",
        "label": "フロー 1",
        "disabled": false,
        "info": "",
        "env": []
    },
    {
        "id": "81dda3a8d2de1bd0",
        "type": "inject",
        "z": "18a5817d19127e87",
        "name": "",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "",
        "payloadType": "date",
        "x": 160,
        "y": 120,
        "wires": [
            [
                "44048fa656dee19c",
                "40cd85ebdf0572d7"
            ]
        ]
    },
    {
        "id": "44048fa656dee19c",
        "type": "mqtt out",
        "z": "18a5817d19127e87",
        "name": "",
        "topic": "/test",
        "qos": "2",
        "retain": "true",
        "respTopic": "",
        "contentType": "",
        "userProps": "",
        "correl": "",
        "expiry": "",
        "broker": "b9dd513617f7e747",
        "x": 810,
        "y": 120,
        "wires": []
    },
    {
        "id": "85f5fa3d75e2e169",
        "type": "mqtt in",
        "z": "18a5817d19127e87",
        "name": "",
        "topic": "/test",
        "qos": "2",
        "datatype": "auto-detect",
        "broker": "b9dd513617f7e747",
        "nl": false,
        "rap": true,
        "rh": 0,
        "inputs": 0,
        "x": 130,
        "y": 240,
        "wires": [
            [
                "e440be6a4380957f"
            ]
        ]
    },
    {
        "id": "40cd85ebdf0572d7",
        "type": "debug",
        "z": "18a5817d19127e87",
        "name": "debug 1",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "false",
        "statusVal": "",
        "statusType": "auto",
        "x": 820,
        "y": 180,
        "wires": []
    },
    {
        "id": "e440be6a4380957f",
        "type": "moment",
        "z": "18a5817d19127e87",
        "name": "",
        "topic": "",
        "input": "",
        "inputType": "msg",
        "inTz": "Asia/TOKYO",
        "adjAmount": 0,
        "adjType": "days",
        "adjDir": "add",
        "format": "YYYY/MM/DD HH:mm:ss",
        "locale": "ja-JP",
        "output": "",
        "outputType": "msg",
        "outTz": "Asia/TOKYO",
        "x": 420,
        "y": 240,
        "wires": [
            [
                "a8905cccf8e845e8"
            ]
        ]
    },
    {
        "id": "a8905cccf8e845e8",
        "type": "debug",
        "z": "18a5817d19127e87",
        "name": "debug 2",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "false",
        "statusVal": "",
        "statusType": "auto",
        "x": 820,
        "y": 240,
        "wires": []
    },
    {
        "id": "b9dd513617f7e747",
        "type": "mqtt-broker",
        "name": "emqx-cluster",
        "broker": "emqx-cluster",
        "port": "1883",
        "clientid": "",
        "autoConnect": true,
        "usetls": false,
        "protocolVersion": "5",
        "keepalive": "60",
        "cleansession": true,
        "birthTopic": "",
        "birthQos": "0",
        "birthPayload": "",
        "birthMsg": {},
        "closeTopic": "",
        "closeQos": "0",
        "closePayload": "",
        "closeMsg": {},
        "willTopic": "",
        "willQos": "0",
        "willPayload": "",
        "willMsg": {},
        "userProps": "",
        "sessionExpiry": ""
    }
]

4.参考文献

Deploying EMQX 4.0 cluster on Kubernetes via Helm3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0