Divide Queue to Separate Queues
There are a couple of cases when you need to move messages from one queue to another using routing key, for example:
When configuring multiple queue. If all your messages are stored in a single queue (like
oro.default
), you may want to configure more than one queue and split all your messages.When setting up backup restore. You may need to exclude messages after the backup (like email notifications, 3rd party application updates, etc.)
Use the Shovel plugin to divide your single queue into separate queues.
Procedure
Example
The example below illustrates how to consume messages from the oro.default queue on the local broker and push them to separate queues, oro.email and oro.notification. All other messages are returned to the oro.default queue.
Use the commands provided by the RabbitMQ management plugin:
declare queue
declare exchange
declare binding
Configure Temporary Queue
Create the oro.temporary queue on your host with durable=true and x-max-priority=4.
rabbitmqadmin declare queue --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
name="oro.temporary" durable=true arguments='{"x-max-priority": 4}'
Relocate Messages to Temporary Queue
Declare the oro.temporary exchange with the fanout type to be able to move messages to oro.temporary with the original routing_key.
rabbitmqadmin declare exchange --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
name="oro.temporary" type="fanout" durable=true
Bind the oro.temporary exchange with the oro.temporary queue. All message from this exchange will be transferred to the oro.default queue.
rabbitmqadmin declare binding --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
source="oro.temporary" destination="oro.temporary" destination_type=queue
Create Dynamic Shovel to relocate all message from the oro.default to the oro.temporary queue.
rabbitmqctl set_parameter shovel temporary-queue \
'{"src-protocol": "amqp091", "src-uri": "amqp://localhost:5672", "src-queue": "oro.default", '\
'"dest-protocol": "amqp091", "dest-uri": "amqp://localhost:5672", "dest-exchange": "oro.temporary", '\
'"src-delete-after": "queue-length"}'
Check if all messages are relocated.
rabbitmqctl shovel_status | grep "temporary-queue"
Shovel will be removed automatically if you specify the src-delete-after=queue-length option.
Remove the oro.temporary exchange.
rabbitmqadmin delete exchange --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
name="oro.temporary"
Configure Alternate Exchange
Declare the oro.temporary.alternate exchange with the fanout type to be able to move messages to the oro.default queue that does not contain the routing_key described in the binding mask.
rabbitmqadmin declare exchange --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
name="oro.temporary.alternate" type="fanout" durable=true
Configure Dividing Exchange
Declare the oro.temporary.divide exchange with the topic type to be able to move messages to separate queues by routing_key. Specify the alternate-exchange=oro.temporary.alternate option that marks the oro.temporary.alternate exchange as a alternate exchange for the oro.temporary.divide exchange.
rabbitmqadmin declare exchange --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
name="oro.temporary.divide" type="topic" durable=true \
arguments='{"alternate-exchange": "oro.temporary.alternate"}'
Create Additional Queues
Declare the oro.email queue with durable=true and x-max-priority=4.
rabbitmqadmin declare queue --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
name=oro.email durable=true arguments='{"x-max-priority": 4}'
Declare the oro.notification queue with durable=true and x-max-priority=4.
rabbitmqadmin declare queue --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
name=oro.notification durable=true arguments='{"x-max-priority": 4}'
Configure Bindings
Bind the oro.temporary.alternate exchange with the oro.default queue. All messages from this exchange will be transferred to the oro.default queue.
rabbitmqadmin declare binding --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
source="oro.temporary.alternate" destination="oro.default" destination_type=queue
Bind the oro.temporary.divide exchange with the oro.email queue by routing_key=oro.email.#
rabbitmqadmin declare binding --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
source="oro.temporary.divide" destination="oro.email" destination_type=queue \
routing_key="oro.email.#"
Bind the oro.temporary.divide exchange with the oro.notification queue by routing_key=oro.notification.#
rabbitmqadmin declare binding --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
source="oro.temporary.divide" destination="oro.notification" destination_type=queue \
routing_key="oro.notification.#"
Relocate Messages to Separate Queues
rabbitmqctl set_parameter shovel divide-queue \
'{"src-protocol": "amqp091", "src-uri": "amqp://localhost:5672", "src-queue": "oro.temporary", '\
'"dest-protocol": "amqp091", "dest-uri": "amqp://localhost:5672", "dest-exchange": "oro.temporary.divide", '\
'"ack-mode": "on-publish", "src-delete-after": "queue-length"}'
Check if all messages are relocated.
rabbitmqctl shovel_status | grep "divide-queue"
Clear Temporary Exchanges and Queues
Remove the oro.temporary.alternate exchange.
rabbitmqadmin delete exchange --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
name="oro.temporary.alternate"
Remove the oro.temporary.divide exchange.
rabbitmqadmin delete exchange --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
name="oro.temporary.divide"
Remove the oro.temporary queue.
rabbitmqadmin delete queue --host=$HOST --user=$USER --password=$PASSWORD --vhost=$VHOST \
name="oro.temporary"
For more information, see the following external resources: