0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

[メモ]EC2 on Kafka手順

Posted at

これはなに

EC2 on Kafkaをやる事がありそうだったので、その個人的な手順のメモです。
もし誰かの役に立てば幸いです。

対象のサーバにSSH

ssh -i ~/.ssh/鍵.pem ec2-user@<パブリックIP>

kafkaダウンロード

wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/3.0.0/kafka_2.13-3.0.0.tgz
tar -xzvf kafka_2.13-3.0.0.tgz
cd kafka_2.13-3.0.0/

Zookeeper起動

bin/zookeeper-server-start.sh config/zookeeper.properties

Kafka起動

bin/kafka-server-start.sh config/server.properties

Topic作成

bin/kafka-topics.sh --create --bootstrap-server=localhost:9092 --replication-factor 1 --partitions 1 --topic my-topic

Topic一覧表示

 bin/kafka-topics.sh --list --bootstrap-server=localhost:9092

テスト送信

Topic送信

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic

Topic受信

bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --topic my-topic

外から叩く設定

イメージ
image.png

kafkaサーバ設定

vi config/server.properties

書き換える

server.properties
#listeners=PLAINTEXT://:9092

listeners=PLAINTEXT://<インスタンスのプライベートIP>:9092

プロデューサー側、コンシューマー側共通

venv

mkdir [project dir]
cd [project dir]
python3 -m venv venv

venv有効化

source venv/bin/activate

kafka-pythonインストール

pip install kafka-python

プロデューサー側設定

producer.py
from 'kafka-python' import KafkaProducer
producer = kafkapy.KafkaProducer(bootstrap_servers=['<kafkaサーバIP>:9092'])
v = producer.send('my-topic', b'test')
metadata = v.get(timeout=10)
print(v)

コンシューマー側設定

consumer.py
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer('my-topic', bootstrap_servers=['<kafkaサーバIP>:9092'])
for msg in consumer:
    print(msg)

ZookeeperとKafkaのsystemd作成

/usr/localへの移動とシンボリックリンクの作成

sudo mv kafka_2.13-3.0.0 /usr/local/.
cd /usr/local/
sudo ln -s  kafka_2.13-3.0.0 kafka

zookeeper-server.service作成

zookeeper-server.service
#sudo vi /etc/systemd/system/zookeeper-server.service
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
ExecStart=/usr/local/kafka_2.13-3.0.0/bin/zookeeper-server-start.sh /usr/local/kafka_2.13-3.0.0/config/zookeeper.properties
ExecStop=/usr/local/kafka_2.13-3.0.0/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

kafka-server.service作成

kafka-server.service
#sudo vi /etc/systemd/system/kafka-server.service
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper-server.service

[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk"
Environment="KAFKA_HEAP_OPTS=-Xmx256M -Xms128M"
ExecStart=/usr/local/kafka_2.13-3.0.0/bin/kafka-server-start.sh /usr/local/kafka_2.13-3.0.0/config/server.properties
ExecStop=/usr/local/kafka_2.13-3.0.0/bin/kafka-server-stop.sh
Restart=on-failure

[Install]
WantedBy=multi-user.target

それぞれ有効化する

systemctl enable zookeeper-server
systemctl enable kafka-server

デーモン再起動

sudo systemctl daemon-reload
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?