Pulsar
こんにちは。ヤフー株式会社システム統括本部の木村です。
私の所属するteamではopen-sourceのpub-sub型messaging platformであるApache Pulsarを社内向けに開発, 運用しています。
弊teamでは過去に幾つかPulsarに関する記事を書いてきました。
- メッセージングPF「Apache Pulsar」の使い方(入門編)
- メッセージングPF「Apache Pulsar」の使い方(クライアント編)
- メッセージングPF「Apache Pulsar」の使い方(クライアント編2)
- Pulsar Summitのセッションと、ヤフーの発表内容紹介
- メッセージングPF「Apache Pulsar」の使い方(サーバー編)
Pulsarがどのようなsoftwareなのかについては上の記事を御参照ください。
本稿では、Pulsarの機能の一つであるPulsar Functionsを紹介し、これをKubernetes上で動作させるtutorialを提供します。
Pulsar Functions
Pulsar Functionsを利用するとuserはPulsar cluster内にfunctionと呼ばれるprocessを作成できます。これは
- 入力topicにmessageが届くと
- userが
.jar
や.py
として実装した処理をmessageに施し - 出力topicに送る
と言う動作をします。
Pulsar Functionsを使用する事で、別個にsystemを用意する事無くPulsar単体でmessageを処理する事が可能になります。これはserverlessなFaaSを提供します(AWS Lambdaなどの影響を受けたようです)。
Pulsar FunctionsをKubernetesで使う
実際にPulsar Functionsを使ってみましょう。
本節では、Pulsar clusterのdeployからfunctionの作成, 利用までを行います。
本節の手順はMacとCentOS上で確認しましたが、Ubuntuなどの他Linux distributionでも動作します。
概ね公式document通りですが、一部注意が必要な箇所が有るために追記, 修正してあります。
事前準備
次のsoftwareがinstallされている必要が有ります。
- Docker
- Kubectl
- Minkube
- Helm
Macでのinstallation例
$ brew install docker kubectl minikube helm
$ brew cask install docker
CentOSでのinstallation例
# docker
# https://docs.docker.com/engine/install/centos/
$ sudo yum install -y yum-utils
$ sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
$ sudo yum install -y docker-ce
$ sudo systemctl start docker
$ sudo usermod -aG docker $USER && newgrp docker
# kubectl
# https://kubernetes.io/docs/tasks/tools/install-kubectl/
$ curl -LO "https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl"
$ chmod +x ./kubectl
$ sudo mv ./kubectl /usr/local/bin/kubectl
$ kubectl version --client
# minikube
# https://minikube.sigs.k8s.io/docs/start/
$ curl -Lo minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
$ chmod +x minikube
# helm
# https://helm.sh/docs/intro/install/
$ curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3
$ chmod 700 get_helm.sh
$ ./get_helm.sh
MinikubeでKubernetes clusterを準備する
本解説ではMinikube 1 を利用します。
既に有るclusterを使う事も可能ですが、本解説通りの手順では動作しない場合が有ります。
# Dockerを起動しておきます
# Minikubeを起動します(defaultでDockerがdriverになります)
$ minikube start --cpus=4
🏄 Done! kubectl is now configured to use "minikube" cluster and "default" namespace by default
# Minikubeをcontextとして設定します
$ kubectl config use-context minikube
Pulsarをdeployする
上で起動したKubernetes clusterにPulsarをdeployします。
deploymentにはHelm 2 のchartが公式に用意されているので、それを利用します。
$ git clone https://github.com/apache/pulsar-helm-chart -b pulsar-2.6.2-2 --depth 1
$ cd pulsar-helm-chart
# Pulsarのdeploymentに使うsecretを生成します
# namespaceもこれで作成されます
# usage: -n <k8s namespace> -k <pulsar release name> -c
$ scripts/pulsar/prepare_helm_release.sh -n pulsar -k pulsar-mini -c
# Minikube用に用意された設定を利用してdeployします
$ helm install pulsar-mini ./charts/pulsar \
-f examples/values-minikube.yaml \
--set initialize=true \
-n pulsar --create-namespace
# functionを作るのに必要な権限をbrokerに与えます
$ kubectl apply -f - <<EOF
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: "pulsar-mini-functions-worker"
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: "pulsar-mini-functions-worker"
subjects:
- kind: ServiceAccount
name: "pulsar-mini-broker-acct"
namespace: pulsar
EOF
# deploymentが完了したか確認します
# 全てのSTATUSがRunningかCompletedになるまで待ちます
$ kubectl get pods -n pulsar
NAME READY STATUS RESTARTS AGE
pulsar-mini-bookie-0 1/1 Running 0 30m
pulsar-mini-bookie-init-q87qw 0/1 Completed 0 30m
pulsar-mini-broker-0 1/1 Running 0 30m
pulsar-mini-grafana-555cf54cf-kr74l 1/1 Running 0 30m
pulsar-mini-prometheus-5556dbb8b8-hjvbq 1/1 Running 0 30m
pulsar-mini-proxy-0 1/1 Running 0 30m
pulsar-mini-pulsar-init-l4whg 0/1 Completed 0 30m
pulsar-mini-pulsar-manager-6c6889dff-5qml9 1/1 Running 0 30m
pulsar-mini-toolset-0 1/1 Running 0 30m
pulsar-mini-zookeeper-0 1/1 Running 0 30m
bookieやbrokerなどのcomponentがpodとしてdeployされました。
ここで利用したvalues-minikube.yaml
はMinikubeへのdeployment用に最小限の設定値が既述されたfileです。違う設定でdeployしたい場合、詳細はHelm chartのREADMEや公式documentを御確認ください。
produce-consume確認
Pulsar clusterが正常に動作しているか確認するためにproduce
とconsume
を実行します。
# 接続先を確認します
$ minikube service pulsar-mini-proxy -n pulsar
|-----------|-------------------|-------------|---------------------------|
| NAMESPACE | NAME | TARGET PORT | URL |
|-----------|-------------------|-------------|---------------------------|
| pulsar | pulsar-mini-proxy | http/80 | http://192.168.49.2:30538 |
| | | pulsar/6650 | http://192.168.49.2:31318 |
|-----------|-------------------|-------------|---------------------------|
🏃 Starting tunnel for service pulsar-mini-proxy.
|-----------|-------------------|-------------|------------------------|
| NAMESPACE | NAME | TARGET PORT | URL |
|-----------|-------------------|-------------|------------------------|
| pulsar | pulsar-mini-proxy | | http://127.0.0.1:56975 |
| | | | http://127.0.0.1:56976 |
|-----------|-------------------|-------------|------------------------|
# 最下のURL(ここでは127.0.0.1:56976)を記録します
別のterminalを開き、subscribeします。
# pulsarをdownload, 解凍
$ cd ..
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.6.2/apache-pulsar-2.6.2-bin.tar.gz
$ tar xf apache-pulsar-2.6.2-bin.tar.gz
$ cd apache-pulsar-2.6.2
# consumerを作成してsubscribeします
# --urlの値は先ほど記録した物です
$ bin/pulsar-client --url pulsar://127.0.0.1:56976 consume persistent://public/default/test -s subscription
更に別のterminalを開きproduceします。
$ bin/pulsar-client --url pulsar://127.0.0.1:56976 produce persistent://public/default/test -m "hello"
subscribeしたterminalに戻りreceiveしているか確認します。
----- got message -----
key:[null], properties:[], content:hello
functionを作成
いよいよPulsar Functionsを利用します。
# pulsar-admin利用のためにtoolsetに入ります
$ kubectl exec -it -n pulsar pulsar-mini-toolset-0 -- /bin/bash
# 初めはfunctionが有りません
$ bin/pulsar-admin functions list
# 何も表示されない
# functionを作成します
# jar: functionsを実装したJARのpath
# classname: functionを実装したclass名
# fqfn: fully qualified function name (<tenant>/<namespace>/<function name>)
# inputs: 入力topic(comma区切り)
# output: 出力topic
# log-topic: log topic
$ bin/pulsar-admin functions create \
--jar examples/api-examples.jar \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--fqfn public/default/exclamation \
--inputs persistent://public/default/in \
--output persistent://public/default/out \
--log-topic persistent://public/default/log
"Created successfully"
# exclamationの名でfunctionが作成されています
$ bin/pulsar-admin functions list
"exclamation"
# functionの状態を確認します
# "runnning" : true であれば問題有りません
$ bin/pulsar-admin functions status --name exclamation
{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 0,
"numSuccessfullyProcessed" : 0,
"numUserExceptions" : 0,
"latestUserExceptions" : [ ],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"averageLatency" : 0.0,
"lastInvocationTime" : 0,
"workerId" : "c-pulsar-mini-fw-pulsar-mini-broker-0.pulsar-mini-broker.pulsar.svc.cluster.local-8080"
}
} ]
}
$ exit
# functionがpodとして作成されています
$ kubectl get pods -n pulsar
pf-public-default-exclamation-0 0/1 Runnning 0 30s # this!
pulsar-mini-bookie-0 1/1 Running 0 1h
pulsar-mini-bookie-init-q87qw 0/1 Completed 0 1h
pulsar-mini-broker-0 1/1 Running 0 1h
pulsar-mini-grafana-555cf54cf-kr74l 1/1 Running 0 1h
pulsar-mini-prometheus-5556dbb8b8-hjvbq 1/1 Running 0 1h
pulsar-mini-proxy-0 1/1 Running 0 1h
pulsar-mini-pulsar-init-l4whg 0/1 Completed 0 1h
pulsar-mini-pulsar-manager-6c6889dff-5qml9 1/1 Running 0 1h
pulsar-mini-toolset-0 1/1 Running 0 1h
pulsar-mini-zookeeper-0 1/1 Running 0 1h
ここで用いたJARは例として公式に準備された物です。
sourceを御覧になると判りますが、入力messageの末尾に!
を追加して出力する単純なmethodです。
当然、自前でfunctionを実装する事も可能です。Javaの場合はjava.util.Function
やorg.apache.pulsar.functions.api.Function
を実装したJARが必要になります(詳細)。Pythonにも対応しています。
functionを利用
作成したfunctionが機能していることを確認します。
# 出力topicにsubscribeします
$ bin/pulsar-client --url pulsar://127.0.0.1:56976 consume persistent://public/default/out -s subscription
別のterminalを開き入力topicにproduceします
$ bin/pulsar-client --url pulsar://127.0.0.1:56976 produce persistent://public/default/in -m "hello"
subscribeしたterminalに戻り、末尾に!
の付いたmessageをreceiveしていることを確認します。
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=CAgQACAA, __pfn_input_topic__=persistent://public/default/in], content:hello!
Pulsar Functionsを利用してmessageにmethodを適用することができました。
後処理
最後に、本稿で導入したresourceなどを削除します。
$ helm delete pulsar-mini -n pulsar
$ kubectl delete --all all -n pulsar
$ minikube delete --all
複数開いたterminalは全て閉じます。
終わりに
本稿では、Pulsarの機能であり、FaaSを実現するPulsar Functionsの紹介と、それをMinikubeで動作させる手順について書きました。
ここで書き切れなかったPulsar Functionsの詳細については公式documentを御参照ください。
本稿がPulsarとPulsar Functionsの理解の一助となれば幸いです。