Supporting Enqueue

Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you’d like to join them, please consider:


Enqueue Mongodb message queue transport

Allows to use MongoDB as a message queue broker.

Installation

$ composer require enqueue/mongodb 

Create context

<?php use Enqueue\Mongodb\MongodbConnectionFactory; // connects to localhost $connectionFactory = new MongodbConnectionFactory(); // same as above $factory = new MongodbConnectionFactory('mongodb:'); // same as above $factory = new MongodbConnectionFactory([]); $factory = new MongodbConnectionFactory([ 'dsn' => 'mongodb://localhost:27017/db_name', 'dbname' => 'enqueue', 'collection_name' => 'enqueue', 'polling_interval' => '1000', ]); $context = $factory->createContext(); // if you have enqueue/enqueue library installed you can use a factory to build context from DSN $context = (new \Enqueue\ConnectionFactoryFactory())->create('mongodb:')->createContext(); 

Send message to topic

<?php /** @var \Enqueue\Mongodb\MongodbContext $context */ /** @var \Enqueue\Mongodb\MongodbDestination $fooTopic */ $message = $context->createMessage('Hello world!'); $context->createProducer()->send($fooTopic, $message); 

Send message to queue

<?php /** @var \Enqueue\Mongodb\MongodbContext $context */ /** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */ $message = $context->createMessage('Hello world!'); $context->createProducer()->send($fooQueue, $message); 

Send priority message

<?php /** @var \Enqueue\Mongodb\MongodbContext $context */ $fooQueue = $context->createQueue('foo'); $message = $context->createMessage('Hello world!'); $context->createProducer() ->setPriority(5) // the higher priority the sooner a message gets to a consumer // ->send($fooQueue, $message) ; 

Send expiration message

<?php /** @var \Enqueue\Mongodb\MongodbContext $context */ /** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */ $message = $context->createMessage('Hello world!'); $context->createProducer() ->setTimeToLive(60000) // 60 sec // ->send($fooQueue, $message) ; 

Send delayed message

<?php use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; /** @var \Enqueue\Mongodb\MongodbContext $context */ /** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */ // make sure you run "composer require enqueue/amqp-tools". $message = $context->createMessage('Hello world!'); $context->createProducer() ->setDeliveryDelay(5000) // 5 sec ->send($fooQueue, $message) ; 

Consume message:

<?php /** @var \Enqueue\Mongodb\MongodbContext $context */ /** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */ $consumer = $context->createConsumer($fooQueue); $message = $consumer->receive(); // process a message $consumer->acknowledge($message); // $consumer->reject($message); 

Subscription consumer

<?php use Interop\Queue\Message; use Interop\Queue\Consumer; /** @var \Enqueue\Mongodb\MongodbContext $context */ /** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */ /** @var \Enqueue\Mongodb\MongodbDestination $barQueue */ $fooConsumer = $context->createConsumer($fooQueue); $barConsumer = $context->createConsumer($barQueue); $subscriptionConsumer = $context->createSubscriptionConsumer(); $subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) { // process message $consumer->acknowledge($message); return true; }); $subscriptionConsumer->subscribe($barConsumer, function(Message $message, Consumer $consumer) { // process message $consumer->acknowledge($message); return true; }); $subscriptionConsumer->consume(2000); // 2 sec 

back to index