これはなに
EC2 on Kafkaをやる事がありそうだったので、その個人的な手順のメモです。
もし誰かの役に立てば幸いです。
対象のサーバにSSH
ssh -i ~/.ssh/鍵.pem ec2-user@<パブリックIP>
kafkaダウンロード
wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/3.0.0/kafka_2.13-3.0.0.tgz
tar -xzvf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0/
Zookeeper起動
bin/zookeeper-server-start.sh config/zookeeper.properties
Kafka起動
bin/kafka-server-start.sh config/server.properties
Topic作成
bin/kafka-topics.sh --create --bootstrap-server=localhost:9092 --replication-factor 1 --partitions 1 --topic my-topic
Topic一覧表示
bin/kafka-topics.sh --list --bootstrap-server=localhost:9092
テスト送信
Topic送信
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic
Topic受信
bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --topic my-topic
外から叩く設定
kafkaサーバ設定
vi config/server.properties
書き換える
server.properties
#listeners=PLAINTEXT://:9092
↓
listeners=PLAINTEXT://<インスタンスのプライベートIP>:9092
プロデューサー側、コンシューマー側共通
venv
mkdir [project dir]
cd [project dir]
python3 -m venv venv
venv有効化
source venv/bin/activate
kafka-pythonインストール
pip install kafka-python
プロデューサー側設定
producer.py
from 'kafka-python' import KafkaProducer
producer = kafkapy.KafkaProducer(bootstrap_servers=['<kafkaサーバIP>:9092'])
v = producer.send('my-topic', b'test')
metadata = v.get(timeout=10)
print(v)
コンシューマー側設定
consumer.py
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer('my-topic', bootstrap_servers=['<kafkaサーバIP>:9092'])
for msg in consumer:
print(msg)
ZookeeperとKafkaのsystemd作成
/usr/localへの移動とシンボリックリンクの作成
sudo mv kafka_2.13-3.0.0 /usr/local/.
cd /usr/local/
sudo ln -s kafka_2.13-3.0.0 kafka
zookeeper-server.service作成
zookeeper-server.service
#sudo vi /etc/systemd/system/zookeeper-server.service
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
ExecStart=/usr/local/kafka_2.13-3.0.0/bin/zookeeper-server-start.sh /usr/local/kafka_2.13-3.0.0/config/zookeeper.properties
ExecStop=/usr/local/kafka_2.13-3.0.0/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target
kafka-server.service作成
kafka-server.service
#sudo vi /etc/systemd/system/kafka-server.service
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper-server.service
[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk"
Environment="KAFKA_HEAP_OPTS=-Xmx256M -Xms128M"
ExecStart=/usr/local/kafka_2.13-3.0.0/bin/kafka-server-start.sh /usr/local/kafka_2.13-3.0.0/config/server.properties
ExecStop=/usr/local/kafka_2.13-3.0.0/bin/kafka-server-stop.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target
それぞれ有効化する
systemctl enable zookeeper-server
systemctl enable kafka-server
デーモン再起動
sudo systemctl daemon-reload