Important

You are browsing upcoming documentation for version 6.0 of OroCommerce, OroCRM, and OroPlatform, scheduled for release in 2024. Read version 5.1 (the latest LTS version) of the Oro documentation to get up-to-date information.

See our Release Process documentation for more information on the currently supported and upcoming releases.

Message Queue Topics 

Each message that is going to be sent to MQ needs a topic that denotes what kind of data it contains, so such message could be appropriately handled by a message consumer. In other words, a topic is a message type.

Declaring MQ Topic 

To declare MQ topic, create a class that implements Oro\Component\MessageQueue\Topic\TopicInterface and register it as a service with tag oro_message_queue.topic. You have to declare the following methods:

  • getName - to provide a topic name that will be used within the message queue transport. It usually consists of lowercase characters, digits, dots, and underscores.

  • getDescription - to provide a human-readable topic description. It is usually not longer than 80 characters.

  • getDefaultPriority - to provide a default priority for a message in the specified queue. It must return a constant from Oro\Component\MessageQueue\Client\MessagePriority.

  • configureMessageBody - to configure Symfony\Component\OptionsResolver\OptionsResolver that is used to validate the message body structure: provide a set of defined and required elements, their default values, allowed types and values, etc.

Note

You can use Oro\Component\MessageQueue\Topic\AbstractTopic as the base class for a new topic.

Note

If the processor subscribed to a topic is implemented with the creation of jobs, it is recommended to implement Oro\Component\MessageQueueueue\Topic\JobAwareTopicInterface for the topic. Then you must declare the following method createJobName, which creates a unique job name in the topic. With this implementation, the job is created immediately after the message is created.

Note

You do not have to explicitly declare the topic as a tagged service if the autoconfigure service container setting is on.

An example of an MQ topic:

Oro/Bundle/EmailBundle/Async/Topic/SendAutoResponseTopic.php 
 namespace Oro\Bundle\EmailBundle\Async\Topic;

 use Oro\Component\MessageQueue\Topic\AbstractTopic;
 use Symfony\Component\OptionsResolver\OptionsResolver;

 class SendAutoResponseTopic extends AbstractTopic
 {
     public static function getName(): string
     {
         return 'oro.email.send_auto_response';
     }

     public static function getDescription(): string
     {
         return 'Send auto response for single email';
     }

     public function configureMessageBody(OptionsResolver $resolver): void
     {
         $resolver
             ->setRequired([
                 'jobId',
                 'id'
             ])
             ->addAllowedTypes('jobId', 'int')
             ->addAllowedTypes('id', 'int');
     }
 }
Resources/config/mq_topics.yml 
 services:
     _defaults:
         tags:
             - { name: oro_message_queue.topic }

     Oro\Bundle\EmailBundle\Async\Topic\SendAutoResponsesTopic: ~

An example of an MQ topic implement Oro\Component\MessageQueueueue\Topic\JobAwareTopicInterface:

Oro/Bundle/SEOBundle/Async/Topic/GenerateSitemapTopic.php 
namespace Oro\Bundle\SEOBundle\Async\Topic;

use Oro\Component\MessageQueue\Topic\AbstractTopic;
use Oro\Component\MessageQueue\Topic\JobAwareTopicInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;

class GenerateSitemapTopic extends AbstractTopic implements JobAwareTopicInterface
{
    public static function getName(): string
    {
        return 'oro.seo.generate_sitemap';
    }

    public static function getDescription(): string
    {
        return 'Generates sitemaps for all websites';
    }

    public function configureMessageBody(OptionsResolver $resolver): void
    {
    }

    public function createJobName($messageBody): string
    {
        return self::getName();
    }
}

Message Queue Topics Registry 

A message queue topics registry Oro\Component\MessageQueue\Topic\TopicRegistry collects all registered MQ topics. Use this service as the main entry point to get an MQ topic object:

$topic = $this->topicRegistry->get(RootJobStoppedTopic::getName());

Note

Registry returns the Oro\Component\MessageQueue\Topic\NullTopic object if the topic is not registered.

Message Body Validation 

Before a message is pushed to the message queue, the message producer validates its message body structure according to the options defined by the message topic in the Oro\Component\MessageQueue\Topic\TopicInterface::configureMessageBody() method. A message that contains an invalid body is not be pushed to the message queue; instead, a corresponding error message is logged.

Note

When %kernel.debug% parameter is true, the message producer throws an exception Oro\Component\MessageQueue\Consumption\Exception\InvalidMessageBodyException.

The message body is also validated during consumption by the MQ extension Oro\Component\MessageQueue\Client\ConsumptionExtension\MessageBodyResolverExtension. A message that contains an invalid body is rejected with a corresponding error log message before it gets to the message processor.

Note

Message body validation is done by Oro\Component\MessageQueue\Client\MessageBodyResolver.

Message body validation ensures that the structure of each message is correct before it comes to the message processor.