AWS上にk3sクラスタを構築したので、そこにMQTTサーバを導入したいと思います。MQTTサーバにはEMQXを使いたいと思います。ただ、一からStatefulSetを作るのには挫折してしまいました。helmを利用してインストールしたいと思います。
1.ソフトウェア構成
※AWS上の4インスタンスに構築したk3sクラスタ構成
Debian GNU/Linux 11
k3s version v1.23.8+k3s1 (53f2d4e7)
go version go1.17.5
Longhorn v1.3.0
Node-RED 2.2.2(latest)
EMQX v5.0.3(docker image emqx/emqx:latest)
2.インストール手順
helmレポジトリ追加
helmレポジトリを追加し確認します。
$ helm repo add emqx https://repos.emqx.io/charts
$ helm repo update
$ helm search repo emqx
NAME CHART VERSION APP VERSION DESCRIPTION
emqx/emqx v4.0.0 v4.0.0 A Helm chart for EMQX
emqx/emqx-ee v4.0.0 v4.0.0 A Helm chart for EMQX
emqx/kuiper 0.1.1 0.1.1 A lightweight IoT edge analytic software
valuesファイルの編集
インストール時にカスタマイズしたいのでvaluesファイルを取得します。
$ helm inspect values emqx/emqx > emqx_values.yaml
変更前
変更前のvaluesファイルを以下に示します。
affinity: {}
containerSecurityContext:
enabled: true
runAsNonRoot: true
runAsUser: 1000
emqxConfig:
EMQX_CLUSTER__DISCOVERY_STRATEGY: k8s
EMQX_CLUSTER__K8S__ADDRESS_TYPE: hostname
EMQX_CLUSTER__K8S__APISERVER: https://kubernetes.default.svc:443
EMQX_CLUSTER__K8S__NAMESPACE: '{{ .Release.Namespace }}'
EMQX_CLUSTER__K8S__SERVICE_NAME: '{{ .Release.Name }}-headless'
EMQX_CLUSTER__K8S__SUFFIX: svc.cluster.local
EMQX_DASHBOARD__DEFAULT_PASSWORD: public
EMQX_DASHBOARD__DEFAULT_USERNAME: admin
emqxLicenseSecretName: null
image:
pullPolicy: IfNotPresent
repository: emqx/emqx
ingress:
dashboard:
annotations: {}
enabled: false
hosts:
- dashboard.emqx.local
path: /
pathType: ImplementationSpecific
tls: []
initContainers: {}
metrics:
enabled: false
type: prometheus
nodeSelector: {}
persistence:
accessMode: ReadWriteOnce
enabled: false
size: 20Mi
storageClassName: ""
podAnnotations: {}
podManagementPolicy: Parallel
podSecurityContext:
enabled: true
fsGroup: 1000
fsGroupChangePolicy: Always
runAsUser: 1000
supplementalGroups:
- 1000
recreatePods: false
replicaCount: 3
resources: {}
service:
annotations: {}
dashboard: 18083
externalIPs: []
loadBalancerSourceRanges: []
mgmt: 8081
mqtt: 1883
mqttssl: 8883
nodePorts:
dashboard: null
dashboardtls: null
mgmt: null
mqtt: null
mqttssl: null
ws: null
wss: null
type: ClusterIP
ws: 8083
wss: 8084
tolerations: []
変更点
valuesファイルを変更していきます。まず、MQTTサーバを動かすノードを指定したかったので、nodeSelectorを使用します。
nodeSelector:
longhorn-node: 'true'
またせっかくlonghornを導入しているので、longhornのボリュームを利用するよう以下の指定を行いました。どれだけ意味のある事かは分かりませんが(笑)
persistence:
accessMode: ReadWriteOnce
enabled: true
size: 20Mi
storageClassName: longhorn
変更後
変更後のvaluesファイルを以下に示します。
affinity: {}
containerSecurityContext:
enabled: true
runAsNonRoot: true
runAsUser: 1000
emqxConfig:
EMQX_CLUSTER__DISCOVERY_STRATEGY: k8s
EMQX_CLUSTER__K8S__ADDRESS_TYPE: hostname
EMQX_CLUSTER__K8S__APISERVER: https://kubernetes.default.svc:443
EMQX_CLUSTER__K8S__NAMESPACE: '{{ .Release.Namespace }}'
EMQX_CLUSTER__K8S__SERVICE_NAME: '{{ .Release.Name }}-headless'
EMQX_CLUSTER__K8S__SUFFIX: svc.cluster.local
EMQX_DASHBOARD__DEFAULT_PASSWORD: public
EMQX_DASHBOARD__DEFAULT_USERNAME: admin
emqxLicenseSecretName: null
image:
pullPolicy: IfNotPresent
repository: emqx/emqx
ingress:
dashboard:
annotations: {}
enabled: false
hosts:
- dashboard.emqx.local
path: /
pathType: ImplementationSpecific
tls: []
initContainers: {}
metrics:
enabled: false
type: prometheus
nodeSelector:
longhorn-node: 'true'
persistence:
accessMode: ReadWriteOnce
enabled: true
size: 20Mi
storageClassName: longhorn
podAnnotations: {}
podManagementPolicy: Parallel
podSecurityContext:
enabled: true
fsGroup: 1000
fsGroupChangePolicy: Always
runAsUser: 1000
supplementalGroups:
- 1000
recreatePods: false
replicaCount: 3
resources: {}
service:
annotations: {}
dashboard: 18083
externalIPs: []
loadBalancerSourceRanges: []
mgmt: 8081
mqtt: 1883
mqttssl: 8883
nodePorts:
dashboard: null
dashboardtls: null
mgmt: null
mqtt: null
mqttssl: null
ws: null
wss: null
type: ClusterIP
ws: 8083
wss: 8084
tolerations: []
emqxインストールと動作確認
変更したemqx_values.yamlを-fオプションにとって、emqxをインストールします。
$helm install -f .\emqx_values.yaml emqx-cluster emqx/emqx
インストール後、動作確認をしてみます。Cluster statusに三つのノードが登録されていれば、クラスタが成立できています。
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
emqx-cluster-0 1/1 Running 0 51s
emqx-cluster-1 1/1 Running 0 52s
emqx-cluster-2 1/1 Running 0 55s
$ kubectl exec -it emqx-cluster-0 -- emqx_ctl status
Node 'emqx@emqx-cluster-0.emqx-cluster-headless.default.svc.cluster.local' 5.0.3 is started
$ kubectl exec -it emqx-cluster-0 -- emqx_ctl cluster status
Cluster status: #{running_nodes =>
['emqx@emqx-cluster-0.emqx-cluster-headless.default.svc.cluster.local',
'emqx@emqx-cluster-1.emqx-cluster-headless.default.svc.cluster.local',
'emqx@emqx-cluster-2.emqx-cluster-headless.default.svc.cluster.local'],
stopped_nodes => []}
emqxのクラスタはどうアドレス解決されるのかも見てみましょう。
kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S)
AGE
emqx-cluster ClusterIP 10.43.135.251 <none> 1883/TCP,8883/TCP,8083/TCP,8084/TCP,18083/TCP
23h
emqx-cluster-headless ClusterIP None <none> 1883/TCP,8883/TCP,8083/TCP,8084/TCP,18083/TCP,4370/TCP 23h
kubernetes ClusterIP 10.43.0.1 <none> 443/TCP
19d
nodered NodePort 10.43.158.49 <none> 1880:31880/TCP
23h
emqx-cluster
と指定すればよさそうです。
3.Node-redとEMQXを連携させる
Node-redですこぶる単純なフローを作って、すこぶる簡単な動作確認をしてみましょう。
フローを作る
タイムスタンプを/testのトピックにpublishし、それを購読して形式変換してdebugノードにつなぐだけの、簡単なフローを作ります。
MQTTノードでMQTTサーバの設定を行います。先ほど確認したように、サーバの項にemqx-cluster
と指定することで、MQTTサーバにつなぐことができます。
MQTTサーバへのpub/sebノードはそれぞれ以下の通りに設定しました。
タイムスタンプノードのボタンを押すとメッセージが発行されます。一回押すごとにdebugウィンドウに二つデバッグメッセージが表示されたら成功です。
フローのデータ
フローを書き出した結果を以下に示します。これを取り込めばフローを再現できますが、この程度のフローならフローエディタ上で一から作ってもたいした事ないかもしれませんね。
[
{
"id": "18a5817d19127e87",
"type": "tab",
"label": "フロー 1",
"disabled": false,
"info": "",
"env": []
},
{
"id": "81dda3a8d2de1bd0",
"type": "inject",
"z": "18a5817d19127e87",
"name": "",
"props": [
{
"p": "payload"
},
{
"p": "topic",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "",
"payloadType": "date",
"x": 160,
"y": 120,
"wires": [
[
"44048fa656dee19c",
"40cd85ebdf0572d7"
]
]
},
{
"id": "44048fa656dee19c",
"type": "mqtt out",
"z": "18a5817d19127e87",
"name": "",
"topic": "/test",
"qos": "2",
"retain": "true",
"respTopic": "",
"contentType": "",
"userProps": "",
"correl": "",
"expiry": "",
"broker": "b9dd513617f7e747",
"x": 810,
"y": 120,
"wires": []
},
{
"id": "85f5fa3d75e2e169",
"type": "mqtt in",
"z": "18a5817d19127e87",
"name": "",
"topic": "/test",
"qos": "2",
"datatype": "auto-detect",
"broker": "b9dd513617f7e747",
"nl": false,
"rap": true,
"rh": 0,
"inputs": 0,
"x": 130,
"y": 240,
"wires": [
[
"e440be6a4380957f"
]
]
},
{
"id": "40cd85ebdf0572d7",
"type": "debug",
"z": "18a5817d19127e87",
"name": "debug 1",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "false",
"statusVal": "",
"statusType": "auto",
"x": 820,
"y": 180,
"wires": []
},
{
"id": "e440be6a4380957f",
"type": "moment",
"z": "18a5817d19127e87",
"name": "",
"topic": "",
"input": "",
"inputType": "msg",
"inTz": "Asia/TOKYO",
"adjAmount": 0,
"adjType": "days",
"adjDir": "add",
"format": "YYYY/MM/DD HH:mm:ss",
"locale": "ja-JP",
"output": "",
"outputType": "msg",
"outTz": "Asia/TOKYO",
"x": 420,
"y": 240,
"wires": [
[
"a8905cccf8e845e8"
]
]
},
{
"id": "a8905cccf8e845e8",
"type": "debug",
"z": "18a5817d19127e87",
"name": "debug 2",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "false",
"statusVal": "",
"statusType": "auto",
"x": 820,
"y": 240,
"wires": []
},
{
"id": "b9dd513617f7e747",
"type": "mqtt-broker",
"name": "emqx-cluster",
"broker": "emqx-cluster",
"port": "1883",
"clientid": "",
"autoConnect": true,
"usetls": false,
"protocolVersion": "5",
"keepalive": "60",
"cleansession": true,
"birthTopic": "",
"birthQos": "0",
"birthPayload": "",
"birthMsg": {},
"closeTopic": "",
"closeQos": "0",
"closePayload": "",
"closeMsg": {},
"willTopic": "",
"willQos": "0",
"willPayload": "",
"willMsg": {},
"userProps": "",
"sessionExpiry": ""
}
]