Message Queue Topics
Each message that is going to be sent to MQ needs a topic
that denotes what kind of data it contains, so such message
could be appropriately handled by a message consumer. In other words, a topic
is a message type.
Declaring MQ Topic
To declare MQ topic, create a class that implements Oro\Component\MessageQueue\Topic\TopicInterface
and register it as a service with tag oro_message_queue.topic
. You have to declare the following methods:
- getName - to provide a topic name that will be used within the message queue transport. It usually consists of lowercase characters, digits, dots, and underscores.
- getDescription - to provide a human-readable topic description. It is usually not longer than 80 characters.
- getDefaultPriority - to provide a default priority for a message in the specified queue. It must return a constant from
Oro\Component\MessageQueue\Client\MessagePriority
. - configureMessageBody - to configure
Symfony\Component\OptionsResolver\OptionsResolver
that is used to validate the message body structure: provide a set of defined and required elements, their default values, allowed types and values, etc.
Note
You can use Oro\Component\MessageQueue\Topic\AbstractTopic
as the base class for a new topic.
Note
You do not have to explicitly declare the topic as a tagged service if the autoconfigure
service container setting is on.
An example of an MQ topic:
namespace Oro\Bundle\EmailBundle\Async\Topic;
use Oro\Component\MessageQueue\Topic\AbstractTopic;
use Symfony\Component\OptionsResolver\OptionsResolver;
class SendAutoResponseTopic extends AbstractTopic
{
public static function getName(): string
{
return 'oro.email.send_auto_response';
}
public static function getDescription(): string
{
return 'Send auto response for single email';
}
public function configureMessageBody(OptionsResolver $resolver): void
{
$resolver
->setRequired([
'jobId',
'id'
])
->addAllowedTypes('jobId', 'int')
->addAllowedTypes('id', 'int');
}
}
services:
_defaults:
tags:
- { name: oro_message_queue.topic }
Oro\Bundle\EmailBundle\Async\Topic\SendAutoResponsesTopic: ~
Message Queue Topics Registry
A message queue topics registry Oro\Component\MessageQueue\Topic\TopicRegistry
collects all registered MQ topics. Use this service as the main entry point to get an MQ topic object:
$topic = $this->topicRegistry->get(RootJobStoppedTopic::getName());
Note
Registry returns the Oro\Component\MessageQueue\Topic\NullTopic
object if the topic is not registered.
Message Body Validation
Before a message is pushed to the message queue, the message producer validates its message body structure
according to the options defined by the message topic in the Oro\Component\MessageQueue\Topic\TopicInterface::configureMessageBody()
method. A message that contains an invalid body is not be pushed to the message queue; instead, a corresponding error message is logged.
Note
When %kernel.debug%
parameter is true, the message producer throws an exception Oro\Component\MessageQueue\Consumption\Exception\InvalidMessageBodyException
.
The message body is also validated during consumption by the MQ extension Oro\Component\MessageQueue\Client\ConsumptionExtension\MessageBodyResolverExtension
.
A message that contains an invalid body is rejected with a corresponding error log message before it gets to the message processor.
Note
Message body validation is done by Oro\Component\MessageQueue\Client\MessageBodyResolver
.
Message body validation ensures that the structure of each message is correct before it comes to the message processor.