Important

You are browsing documentation for version 5.0 of OroCommerce, OroCRM, and OroPlatform, maintained until August 2024 and supported until March 2026. See version 5.1 (the latest LTS version) of the Oro documentation for information on latest features.

See our Release Process documentation for more information on the currently supported and upcoming releases.

Message Queue Topics

Each message that is going to be sent to MQ needs a topic that denotes what kind of data it contains, so such message could be appropriately handled by a message consumer. In other words, a topic is a message type.

Declaring MQ Topic

To declare MQ topic, create a class that implements Oro\Component\MessageQueue\Topic\TopicInterface and register it as a service with tag oro_message_queue.topic. You have to declare the following methods:

  • getName - to provide a topic name that will be used within the message queue transport. It usually consists of lowercase characters, digits, dots, and underscores.
  • getDescription - to provide a human-readable topic description. It is usually not longer than 80 characters.
  • getDefaultPriority - to provide a default priority for a message in the specified queue. It must return a constant from Oro\Component\MessageQueue\Client\MessagePriority.
  • configureMessageBody - to configure Symfony\Component\OptionsResolver\OptionsResolver that is used to validate the message body structure: provide a set of defined and required elements, their default values, allowed types and values, etc.

Note

You can use Oro\Component\MessageQueue\Topic\AbstractTopic as the base class for a new topic.

Note

You do not have to explicitly declare the topic as a tagged service if the autoconfigure service container setting is on.

An example of an MQ topic:

Async/Topic/SendAutoResponseTopic.php
 namespace Oro\Bundle\EmailBundle\Async\Topic;

 use Oro\Component\MessageQueue\Topic\AbstractTopic;
 use Symfony\Component\OptionsResolver\OptionsResolver;

 class SendAutoResponseTopic extends AbstractTopic
 {
     public static function getName(): string
     {
         return 'oro.email.send_auto_response';
     }

     public static function getDescription(): string
     {
         return 'Send auto response for single email';
     }

     public function configureMessageBody(OptionsResolver $resolver): void
     {
         $resolver
             ->setRequired([
                 'jobId',
                 'id'
             ])
             ->addAllowedTypes('jobId', 'int')
             ->addAllowedTypes('id', 'int');
     }
 }
Resources/config/mq_topics.yml
 services:
     _defaults:
         tags:
             - { name: oro_message_queue.topic }

     Oro\Bundle\EmailBundle\Async\Topic\SendAutoResponsesTopic: ~

Message Queue Topics Registry

A message queue topics registry Oro\Component\MessageQueue\Topic\TopicRegistry collects all registered MQ topics. Use this service as the main entry point to get an MQ topic object:

$topic = $this->topicRegistry->get(RootJobStoppedTopic::getName());

Note

Registry returns the Oro\Component\MessageQueue\Topic\NullTopic object if the topic is not registered.

Message Body Validation

Before a message is pushed to the message queue, the message producer validates its message body structure according to the options defined by the message topic in the Oro\Component\MessageQueue\Topic\TopicInterface::configureMessageBody() method. A message that contains an invalid body is not be pushed to the message queue; instead, a corresponding error message is logged.

Note

When %kernel.debug% parameter is true, the message producer throws an exception Oro\Component\MessageQueue\Consumption\Exception\InvalidMessageBodyException.

The message body is also validated during consumption by the MQ extension Oro\Component\MessageQueue\Client\ConsumptionExtension\MessageBodyResolverExtension. A message that contains an invalid body is rejected with a corresponding error log message before it gets to the message processor.

Note

Message body validation is done by Oro\Component\MessageQueue\Client\MessageBodyResolver.

Message body validation ensures that the structure of each message is correct before it comes to the message processor.