概要
とりあえずKafkaをDockerで立ち上げてローカルのRubyスクリプトから接続する手順を紹介します。実際にはKafkaクラスタの構成や設定、その他Kafkaをプログラムから使う上で把握しておいた方が良いことが多々ありますが、全て割愛します。
前提ソフトウェア
ソフトウェア | バージョン | 備考 |
---|---|---|
bitnami/kafka | 2.4.1-debian-10-r8 | Dockerイメージ |
bitnami/zookeeper | 3.6.0-debian-10-r12 | Dockerイメージ |
ruby | 2.6.3p62 | - |
ruby-kafka | 1.0.0 | - |
準備
docker-compose.yml
Kafkaが動作するにはZooKeeperも必要なのでこのエントリではdocker-composeを使います。Kafkaのイメージはbitnamiが提供しているDockerイメージを使います。KafkaのDockerイメージとしてはDL数も10M+で最もメジャーなものかと思います。
以下の内容をdocker-compose.yml
に記述して下さい。この内容はbitnamiが提供するdocker-compose.ymlに対してDocker Hubに書いてあるAccessing Kafka with internal and external clientsの変更を加えたものになります。
DockerボリュームにZooKeeper, Kafkaのデータを永続化していますので手順をゼロからやり直す場合はご注意下さい。プログラムからはローカルホストのポート29092
番でKafkaブローカーに接続できるように設定しています。
version: '2'
services:
zookeeper:
image: 'bitnami/zookeeper:3'
ports:
- '2181:2181'
volumes:
- 'zookeeper_data:/bitnami'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:2'
ports:
- '9092:9092'
- '29092:29092'
volumes:
- 'kafka_data:/bitnami'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
Rubyスクリプト
Rubyのスクリプトを用意します。Kafkaクライアントのラッパーはいくつかありますが仕組みがわからないとドはまりするので最初は素のruby-kafka
を使うことを強くおすすめします。
Gemfile
Gemfile
を作ります。
bundle init
Gemfile
を編集して以下の1行を追加します。
gem "ruby-kafka"
Gemをインストールします。
bundle install
メッセージ送信スクリプト
produce.rb
として以下の内容を記述します。何も指定せずに接続して、時刻が入ったメッセージをtest_topic
に1回送るだけです。余計なことは一切しません。
require "kafka"
kafka = Kafka.new("127.0.0.1:29092")
kafka.deliver_message("Hello at #{Time.now.to_s}", topic: "test_topic")
メッセージ受信スクリプト
consume.rb
として以下の内容を記述します。test_topic
からメッセージを読み込み続けます。このスクリプトはブロッキングするので終了しません。
require "kafka"
kafka = Kafka.new("127.0.0.1:29092")
kafka.each_message(topic: "test_topic") do |message|
puts "offset: #{message.offset}"
puts "key: #{message.key}"
puts "value: #{message.value}"
end
以上で準備が整いました。
動作
Kafkaの起動
docker-compose
でZooKeeperとKafkaブローカーを起動します。起動には少し時間がかかりますのでログ出力が落ち着くまで待ちます。
docker-compose up
[2020-03-21 04:49:01,561] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)
というようなメッセージが出たら起動は終わっています。
Kafkaにメッセージを送信
Kafkaにメッセージを送信します。
bundle exec ruby produce.rb
Kafka::LeaderNotAvailable
エラーが出るかも知れませんが無視します。不安ならもう1回実行するとエラーなく終了するはずです。
Kafkaからメッセージを受信
Kafkaからメッセージを受信します。過去に送ったメッセージがすべて出力されます。
bundle exec ruby consume.rb
ここでもう1度prodduce.rb
でメッセージを送ってみると、起動中のconsume.rb
で同時にメッセージが受信されることが確認できます。