Important
You are browsing upcoming documentation for version 6.1 of OroCommerce, scheduled for release in 2025. Read the documentation for version 6.0 (the latest LTS version) to get up-to-date information.
See our Release Process documentation for more information on the currently supported and upcoming releases.
Message Queue Topics
Each message that is going to be sent to MQ needs a topic
that denotes what kind of data it contains, so such message
could be appropriately handled by a message consumer. In other words, a topic
is a message type.
Declaring MQ Topic
To declare MQ topic, create a class that implements Oro\Component\MessageQueue\Topic\TopicInterface
and register it as a service with tag oro_message_queue.topic
. You have to declare the following methods:
getName - to provide a topic name that will be used within the message queue transport. It usually consists of lowercase characters, digits, dots, and underscores.
getDescription - to provide a human-readable topic description. It is usually not longer than 80 characters.
getDefaultPriority - to provide a default priority for a message in the specified queue. It must return a constant from
Oro\Component\MessageQueue\Client\MessagePriority
.configureMessageBody - to configure
Symfony\Component\OptionsResolver\OptionsResolver
that is used to validate the message body structure: provide a set of defined and required elements, their default values, allowed types and values, etc.
Note
You can use Oro\Component\MessageQueue\Topic\AbstractTopic
as the base class for a new topic.
Note
If the processor subscribed to a topic is implemented with the creation of jobs, it is recommended to implement Oro\Component\MessageQueueueue\Topic\JobAwareTopicInterface
for the topic.
Then you must declare the following method createJobName, which creates a unique job name in the topic.
With this implementation, the job is created immediately after the message is created.
Note
You do not have to explicitly declare the topic as a tagged service if the autoconfigure
service container setting is on.
An example of an MQ topic:
namespace Oro\Bundle\EmailBundle\Async\Topic;
use Oro\Component\MessageQueue\Topic\AbstractTopic;
use Symfony\Component\OptionsResolver\OptionsResolver;
class SendAutoResponseTopic extends AbstractTopic
{
public static function getName(): string
{
return 'oro.email.send_auto_response';
}
public static function getDescription(): string
{
return 'Send auto response for single email';
}
public function configureMessageBody(OptionsResolver $resolver): void
{
$resolver
->setRequired([
'jobId',
'id'
])
->addAllowedTypes('jobId', 'int')
->addAllowedTypes('id', 'int');
}
}
services:
_defaults:
tags:
- { name: oro_message_queue.topic }
Oro\Bundle\EmailBundle\Async\Topic\SendAutoResponsesTopic: ~
An example of an MQ topic implement Oro\Component\MessageQueueueue\Topic\JobAwareTopicInterface
:
namespace Oro\Bundle\SEOBundle\Async\Topic;
use Oro\Component\MessageQueue\Topic\AbstractTopic;
use Oro\Component\MessageQueue\Topic\JobAwareTopicInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
class GenerateSitemapTopic extends AbstractTopic implements JobAwareTopicInterface
{
public static function getName(): string
{
return 'oro.seo.generate_sitemap';
}
public static function getDescription(): string
{
return 'Generates sitemaps for all websites';
}
public function configureMessageBody(OptionsResolver $resolver): void
{
}
public function createJobName($messageBody): string
{
return self::getName();
}
}
Message Queue Topics Registry
A message queue topics registry Oro\Component\MessageQueue\Topic\TopicRegistry
collects all registered MQ topics. Use this service as the main entry point to get an MQ topic object:
$topic = $this->topicRegistry->get(RootJobStoppedTopic::getName());
Note
Registry returns the Oro\Component\MessageQueue\Topic\NullTopic
object if the topic is not registered.
Message Body Validation
Before a message is pushed to the message queue, the message producer validates its message body structure
according to the options defined by the message topic in the Oro\Component\MessageQueue\Topic\TopicInterface::configureMessageBody()
method. A message that contains an invalid body is not be pushed to the message queue; instead, a corresponding error message is logged.
Note
When %kernel.debug%
parameter is true, the message producer throws an exception Oro\Component\MessageQueue\Consumption\Exception\InvalidMessageBodyException
.
The message body is also validated during consumption by the MQ extension Oro\Component\MessageQueue\Client\ConsumptionExtension\MessageBodyResolverExtension
.
A message that contains an invalid body is rejected with a corresponding error log message before it gets to the message processor.
Note
Message body validation is done by Oro\Component\MessageQueue\Client\MessageBodyResolver
.
Message body validation ensures that the structure of each message is correct before it comes to the message processor.