ここ数日RabbitMQとrabbitmq-delayed-message-exchangeとElixirを組み合わせたサンプルを書くということをしている
ようやっとサンプルが動いたのでブログを書く
やりたいこと
github.com
やりたいことは、このPluginを使って指定した秒数待ったあとにメッセージを送るというもの
ここのReadmeに使い方が書いてあるのだけど、初心者の僕には、Javaっぽい言語のソースコードが載っているだけでイマイチピンと来なかった。
この記事ではRabbitMQでdelayed-message-exchangeのpluginだけインストールした状態から、実際にElixirでメッセージ送信するためにやったことを紹介する。
Exchange設定
こんな感じにRabbitMQ managementからExchangeを設定する。
Elixirプログラム
受信側
defmodule Consumer do
use GenServer
use AMQP
def start_link do
GenServer.start_link(__MODULE__, [], [])
end
@exchange "my-exchange"
@queue "gen_server_test_queue"
@queue_error "#{@queue}_error"
def init(_opts) do
{:ok, conn} = Connection.open
{:ok, chan} = Channel.open(conn)
# Limit unacknowledged messages to 10
Basic.qos(chan, prefetch_count: 10)
Queue.declare(chan, @queue_error, durable: true)
# Messages that cannot be delivered to any consumer in the main queue will be routed to the error queue
Queue.declare(chan, @queue, durable: true,
arguments: [{"x-dead-letter-exchange", :longstr, ""},
{"x-dead-letter-routing-key", :longstr, @queue_error}])
Queue.bind(chan, @queue, @exchange)
# Register the GenServer process as a consumer
{:ok, _consumer_tag} = Basic.consume(chan, @queue)
{:ok, chan}
end
# Confirmation sent by the broker after registering this process as a consumer
def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, chan) do
{:noreply, chan}
end
# Sent by the broker when the consumer is unexpectedly cancelled (such as after a queue deletion)
def handle_info({:basic_cancel, %{consumer_tag: consumer_tag}}, chan) do
{:stop, :normal, chan}
end
# Confirmation sent by the broker to the consumer process after a Basic.cancel
def handle_info({:basic_cancel_ok, %{consumer_tag: consumer_tag}}, chan) do
{:noreply, chan}
end
def handle_info({:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered}}, chan) do
spawn fn -> consume(chan, tag, redelivered, payload) end
{:noreply, chan}
end
defp consume(channel, tag, redelivered, payload) do
number = String.to_integer(payload)
if number <= 10 do
Basic.ack channel, tag
IO.puts "Consumed a #{number}."
else
Basic.reject channel, tag, requeue: false
IO.puts "#{number} is too big and was rejected."
end
rescue
# Requeue unless it's a redelivered message.
# This means we will retry consuming a message once in case of exception
# before we give up and have it moved to the error queue
#
# You might also want to catch :exit signal in production code.
# Make sure you call ack, nack or reject otherwise comsumer will stop
# receiving messages.
exception ->
Basic.reject channel, tag, requeue: not redelivered
IO.puts "Error converting #{payload} to integer"
end
end
送信側
defmodule RabbitmqTutorials do
@queue "gen_server_test_queue"
@exchange "my-exchange"
def hello do
{:ok, connection} = AMQP.Connection.open
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Basic.publish(channel, @exchange, "", "5", headers: [{"x-delay", 5000}])
AMQP.Connection.close(connection)
end
end
あくまでサンプルなので、ここからもっと洗練する必要がある
今回の目的は「サンプルを書いて動かしてみる」なので以上
躓いた所
AMQP
RabbitMQではAMQPというプロトコルを使用するのですが、中途半端な理解なまま先に進んだためわけわからなくなった。
AMQPについてはGreeの技術記事がすごくわかりやすかった
AMQPによるメッセージング | GREE Engineers' Blog
AMQP.Exchange#declare/4
ElixirのAMQPクライアントには AMQP.Exchange#declare/4
という関数があって、これを使えばExchangeを登録できる
AMQP.Exchange – amqp v0.3.0
delayed-message-exchange
を使うには typeに x-delayed-message
を入れないといけない
AMQP.Exchange#declare/4
のtypeはAtomを入れるようになっていて x-delayed-message
をAtomで書きたいんだけど、 -
が付いているもんだからAtomが作れない
仕方がないからRabbitMQ managementで登録することにした
RabbitMQ management exchange登録
なんもわからずに登録しようとしたらこんなこと言われた
406 PRECONDITION_FAILED - Invalid argument, 'x-delayed-type' must be an existing exchange type
arguments
に x-delayed-type
入れろとのこと
感想
ちゃんと理解してから先に進むの大事だなと思った