Message Queue Topics

Each message that is going to be sent to MQ needs a topic that denotes what kind of data it contains, so such message could be appropriately handled by a message consumer. In other words, a topic is a message type.

Declaring MQ Topic

To declare MQ topic, create a class that implements Oro\Component\MessageQueue\Topic\TopicInterface and register it as a service with tag oro_message_queue.topic. You have to declare the following methods:

  • getName - to provide a topic name that will be used within the message queue transport. It usually consists of lowercase characters, digits, dots, and underscores.
  • getDescription - to provide a human-readable topic description. It is usually not longer than 80 characters.
  • getDefaultPriority - to provide a default priority for a message in the specified queue. It must return a constant from Oro\Component\MessageQueue\Client\MessagePriority.
  • configureMessageBody - to configure Symfony\Component\OptionsResolver\OptionsResolver that is used to validate the message body structure: provide a set of defined and required elements, their default values, allowed types and values, etc.

Note

You can use Oro\Component\MessageQueue\Topic\AbstractTopic as the base class for a new topic.

Note

You do not have to explicitly declare the topic as a tagged service if the autoconfigure service container setting is on.

An example of an MQ topic:

Async/Topic/SendAutoResponseTopic.php
 namespace Oro\Bundle\EmailBundle\Async\Topic;

 use Oro\Component\MessageQueue\Topic\AbstractTopic;
 use Symfony\Component\OptionsResolver\OptionsResolver;

 class SendAutoResponseTopic extends AbstractTopic
 {
     public static function getName(): string
     {
         return 'oro.email.send_auto_response';
     }

     public static function getDescription(): string
     {
         return 'Send auto response for single email';
     }

     public function configureMessageBody(OptionsResolver $resolver): void
     {
         $resolver
             ->setRequired([
                 'jobId',
                 'id'
             ])
             ->addAllowedTypes('jobId', 'int')
             ->addAllowedTypes('id', 'int');
     }
 }
Resources/config/mq_topics.yml
 services:
     _defaults:
         tags:
             - { name: oro_message_queue.topic }

     Oro\Bundle\EmailBundle\Async\Topic\SendAutoResponsesTopic: ~

Message Queue Topics Registry

A message queue topics registry Oro\Component\MessageQueue\Topic\TopicRegistry collects all registered MQ topics. Use this service as the main entry point to get an MQ topic object:

$topic = $this->topicRegistry->get(RootJobStoppedTopic::getName());

Note

Registry returns the Oro\Component\MessageQueue\Topic\NullTopic object if the topic is not registered.

Message Body Validation

Before a message is pushed to the message queue, the message producer validates its message body structure according to the options defined by the message topic in the Oro\Component\MessageQueue\Topic\TopicInterface::configureMessageBody() method. A message that contains an invalid body is not be pushed to the message queue; instead, a corresponding error message is logged.

Note

When %kernel.debug% parameter is true, the message producer throws an exception Oro\Component\MessageQueue\Consumption\Exception\InvalidMessageBodyException.

The message body is also validated during consumption by the MQ extension Oro\Component\MessageQueue\Client\ConsumptionExtension\MessageBodyResolverExtension. A message that contains an invalid body is rejected with a corresponding error log message before it gets to the message processor.

Note

Message body validation is done by Oro\Component\MessageQueue\Client\MessageBodyResolver.

Message body validation ensures that the structure of each message is correct before it comes to the message processor.