Message Queue Topics
Each message that is going to be sent to MQ needs a topic
that denotes what kind of data it contains, so such message
could be appropriately handled by a message consumer. In other words, a topic
is a message type.
Declaring MQ Topic
To declare MQ topic, create a class that implements Oro\Component\MessageQueue\Topic\TopicInterface
and register it as a service with tag oro_message_queue.topic
. You have to declare the following methods:
getName - to provide a topic name that will be used within the message queue transport. It usually consists of lowercase characters, digits, dots, and underscores.
getDescription - to provide a human-readable topic description. It is usually not longer than 80 characters.
getDefaultPriority - to provide a default priority for a message in the specified queue. It must return a constant from
Oro\Component\MessageQueue\Client\MessagePriority
.configureMessageBody - to configure
Symfony\Component\OptionsResolver\OptionsResolver
that is used to validate the message body structure: provide a set of defined and required elements, their default values, allowed types and values, etc.
Note
You can use Oro\Component\MessageQueue\Topic\AbstractTopic
as the base class for a new topic.
Note
If the processor subscribed to a topic is implemented with the creation of jobs, it is recommended to implement Oro\Component\MessageQueueueue\Topic\JobAwareTopicInterface
for the topic.
Then you must declare the following method createJobName, which creates a unique job name in the topic.
With this implementation, the job is created immediately after the message is created.
Note
You do not have to explicitly declare the topic as a tagged service if the autoconfigure
service container setting is on.
An example of an MQ topic:
namespace Oro\Bundle\EmailBundle\Async\Topic;
use Oro\Component\MessageQueue\Topic\AbstractTopic;
use Symfony\Component\OptionsResolver\OptionsResolver;
class SendAutoResponseTopic extends AbstractTopic
{
public static function getName(): string
{
return 'oro.email.send_auto_response';
}
public static function getDescription(): string
{
return 'Send auto response for single email';
}
public function configureMessageBody(OptionsResolver $resolver): void
{
$resolver
->setRequired([
'jobId',
'id'
])
->addAllowedTypes('jobId', 'int')
->addAllowedTypes('id', 'int');
}
}
services:
_defaults:
tags:
- { name: oro_message_queue.topic }
Oro\Bundle\EmailBundle\Async\Topic\SendAutoResponsesTopic: ~
An example of an MQ topic implement Oro\Component\MessageQueueueue\Topic\JobAwareTopicInterface
:
namespace Oro\Bundle\SEOBundle\Async\Topic;
use Oro\Component\MessageQueue\Topic\AbstractTopic;
use Oro\Component\MessageQueue\Topic\JobAwareTopicInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;
class GenerateSitemapTopic extends AbstractTopic implements JobAwareTopicInterface
{
public static function getName(): string
{
return 'oro.seo.generate_sitemap';
}
public static function getDescription(): string
{
return 'Generates sitemaps for all websites';
}
public function configureMessageBody(OptionsResolver $resolver): void
{
}
public function createJobName($messageBody): string
{
return self::getName();
}
}
Message Queue Topics Registry
A message queue topics registry Oro\Component\MessageQueue\Topic\TopicRegistry
collects all registered MQ topics. Use this service as the main entry point to get an MQ topic object:
$topic = $this->topicRegistry->get(RootJobStoppedTopic::getName());
Note
Registry returns the Oro\Component\MessageQueue\Topic\NullTopic
object if the topic is not registered.
Message Body Validation
Before a message is pushed to the message queue, the message producer validates its message body structure
according to the options defined by the message topic in the Oro\Component\MessageQueue\Topic\TopicInterface::configureMessageBody()
method. A message that contains an invalid body is not be pushed to the message queue; instead, a corresponding error message is logged.
Note
When %kernel.debug%
parameter is true, the message producer throws an exception Oro\Component\MessageQueue\Consumption\Exception\InvalidMessageBodyException
.
The message body is also validated during consumption by the MQ extension Oro\Component\MessageQueue\Client\ConsumptionExtension\MessageBodyResolverExtension
.
A message that contains an invalid body is rejected with a corresponding error log message before it gets to the message processor.
Note
Message body validation is done by Oro\Component\MessageQueue\Client\MessageBodyResolver
.
Message body validation ensures that the structure of each message is correct before it comes to the message processor.