Message Queue Topics 

Each message that is going to be sent to MQ needs a topic that denotes what kind of data it contains, so such message could be appropriately handled by a message consumer. In other words, a topic is a message type.

Declaring MQ Topic 

To declare MQ topic, create a class that implements Oro\Component\MessageQueue\Topic\TopicInterface and register it as a service with tag oro_message_queue.topic. You have to declare the following methods:

  • getName - to provide a topic name that will be used within the message queue transport. It usually consists of lowercase characters, digits, dots, and underscores.

  • getDescription - to provide a human-readable topic description. It is usually not longer than 80 characters.

  • getDefaultPriority - to provide a default priority for a message in the specified queue. It must return a constant from Oro\Component\MessageQueue\Client\MessagePriority.

  • configureMessageBody - to configure Symfony\Component\OptionsResolver\OptionsResolver that is used to validate the message body structure: provide a set of defined and required elements, their default values, allowed types and values, etc.


You can use Oro\Component\MessageQueue\Topic\AbstractTopic as the base class for a new topic.


If the processor subscribed to a topic is implemented with the creation of jobs, it is recommended to implement Oro\Component\MessageQueueueue\Topic\JobAwareTopicInterface for the topic. Then you must declare the following method createJobName, which creates a unique job name in the topic. With this implementation, the job is created immediately after the message is created.


You do not have to explicitly declare the topic as a tagged service if the autoconfigure service container setting is on.

An example of an MQ topic:

 namespace Oro\Bundle\EmailBundle\Async\Topic;

 use Oro\Component\MessageQueue\Topic\AbstractTopic;
 use Symfony\Component\OptionsResolver\OptionsResolver;

 class SendAutoResponseTopic extends AbstractTopic
     public static function getName(): string
         return '';

     public static function getDescription(): string
         return 'Send auto response for single email';

     public function configureMessageBody(OptionsResolver $resolver): void
             ->addAllowedTypes('jobId', 'int')
             ->addAllowedTypes('id', 'int');
             - { name: oro_message_queue.topic }

     Oro\Bundle\EmailBundle\Async\Topic\SendAutoResponsesTopic: ~

An example of an MQ topic implement Oro\Component\MessageQueueueue\Topic\JobAwareTopicInterface:

namespace Oro\Bundle\SEOBundle\Async\Topic;

use Oro\Component\MessageQueue\Topic\AbstractTopic;
use Oro\Component\MessageQueue\Topic\JobAwareTopicInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;

class GenerateSitemapTopic extends AbstractTopic implements JobAwareTopicInterface
    public static function getName(): string
        return 'oro.seo.generate_sitemap';

    public static function getDescription(): string
        return 'Generates sitemaps for all websites';

    public function configureMessageBody(OptionsResolver $resolver): void

    public function createJobName($messageBody): string
        return self::getName();

Message Queue Topics Registry 

A message queue topics registry Oro\Component\MessageQueue\Topic\TopicRegistry collects all registered MQ topics. Use this service as the main entry point to get an MQ topic object:

$topic = $this->topicRegistry->get(RootJobStoppedTopic::getName());


Registry returns the Oro\Component\MessageQueue\Topic\NullTopic object if the topic is not registered.

Message Body Validation 

Before a message is pushed to the message queue, the message producer validates its message body structure according to the options defined by the message topic in the Oro\Component\MessageQueue\Topic\TopicInterface::configureMessageBody() method. A message that contains an invalid body is not be pushed to the message queue; instead, a corresponding error message is logged.


When %kernel.debug% parameter is true, the message producer throws an exception Oro\Component\MessageQueue\Consumption\Exception\InvalidMessageBodyException.

The message body is also validated during consumption by the MQ extension Oro\Component\MessageQueue\Client\ConsumptionExtension\MessageBodyResolverExtension. A message that contains an invalid body is rejected with a corresponding error log message before it gets to the message processor.


Message body validation is done by Oro\Component\MessageQueue\Client\MessageBodyResolver.

Message body validation ensures that the structure of each message is correct before it comes to the message processor.