Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@
use Illuminate\Queue\Console\ListenCommand as QueueListenCommand;
use Illuminate\Queue\Console\ListFailedCommand as ListFailedQueueCommand;
use Illuminate\Queue\Console\MonitorCommand as QueueMonitorCommand;
use Illuminate\Queue\Console\PauseCommand as QueuePauseCommand;
use Illuminate\Queue\Console\PruneBatchesCommand as QueuePruneBatchesCommand;
use Illuminate\Queue\Console\PruneFailedJobsCommand as QueuePruneFailedJobsCommand;
use Illuminate\Queue\Console\RestartCommand as QueueRestartCommand;
use Illuminate\Queue\Console\ResumeCommand as QueueResumeCommand;
use Illuminate\Queue\Console\RetryBatchCommand as QueueRetryBatchCommand;
use Illuminate\Queue\Console\RetryCommand as QueueRetryCommand;
use Illuminate\Queue\Console\TableCommand;
Expand Down Expand Up @@ -150,9 +152,11 @@ class ArtisanServiceProvider extends ServiceProvider implements DeferrableProvid
'QueueForget' => ForgetFailedQueueCommand::class,
'QueueListen' => QueueListenCommand::class,
'QueueMonitor' => QueueMonitorCommand::class,
'QueuePause' => QueuePauseCommand::class,
'QueuePruneBatches' => QueuePruneBatchesCommand::class,
'QueuePruneFailedJobs' => QueuePruneFailedJobsCommand::class,
'QueueRestart' => QueueRestartCommand::class,
'QueueResume' => QueueResumeCommand::class,
'QueueRetry' => QueueRetryCommand::class,
'QueueRetryBatch' => QueueRetryBatchCommand::class,
'QueueWork' => QueueWorkCommand::class,
Expand Down
73 changes: 73 additions & 0 deletions src/Illuminate/Queue/Console/PauseCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

namespace Illuminate\Queue\Console;

use Illuminate\Console\Command;
use Illuminate\Contracts\Queue\Factory as QueueManager;
use Symfony\Component\Console\Attribute\AsCommand;

#[AsCommand(name: 'queue:pause')]
class PauseCommand extends Command
{
/**
* The console command name.
*
* @var string
*/
protected $signature = 'queue:pause {queue : The name of the queue to pause}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Pause job processing for a specific queue';

/**
* The queue manager instance.
*
* @var \Illuminate\Contracts\Queue\Factory
*/
protected $manager;

/**
* Create a new queue pause command.
*/
public function __construct(QueueManager $manager)
{
parent::__construct();

$this->manager = $manager;
}

/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
[$connection, $queue] = $this->parseQueue($this->argument('queue'));

$this->manager->pause($connection, $queue);

$this->components->info("Job processing on queue [{$connection}:{$queue}] has been paused.");

return 0;
}

/**
* Parse the queue argument into the connection and queue name.
*
* @param string $queue
* @return array
*/
protected function parseQueue($queue)
{
[$connection, $queue] = array_pad(explode(':', $queue, 2), 2, null);

return isset($queue)
? [$connection, $queue]
: [$this->laravel['config']['queue.default'], $connection];
}
}
73 changes: 73 additions & 0 deletions src/Illuminate/Queue/Console/ResumeCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

namespace Illuminate\Queue\Console;

use Illuminate\Console\Command;
use Illuminate\Contracts\Queue\Factory as QueueManager;
use Symfony\Component\Console\Attribute\AsCommand;

#[AsCommand(name: 'queue:resume')]
class ResumeCommand extends Command
{
/**
* The console command name.
*
* @var string
*/
protected $signature = 'queue:resume {queue : The name of the queue that should resume processing}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Resume job processing for a paused queue';

/**
* The queue manager instance.
*
* @var \Illuminate\Contracts\Queue\Factory
*/
protected $manager;

/**
* Create a new queue resume command.
*/
public function __construct(QueueManager $manager)
{
parent::__construct();

$this->manager = $manager;
}

/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
[$connection, $queue] = $this->parseQueue($this->argument('queue'));

$this->manager->resume($connection, $queue);

$this->components->info("Job processing on queue [{$connection}:{$queue}] has been resumed.");

return 0;
}

/**
* Parse the queue argument into connection and queue name.
*
* @param string $queue
* @return array
*/
protected function parseQueue($queue)
{
[$connection, $queue] = array_pad(explode(':', $queue, 2), 2, null);

return isset($queue)
? [$connection, $queue]
: [$this->laravel['config']['queue.default'], $connection];
}
}
42 changes: 42 additions & 0 deletions src/Illuminate/Queue/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,48 @@ protected function getConnector($driver)
return call_user_func($this->connectors[$driver]);
}

/**
* Pause a queue by its connection and name.
*
* @param string $connection
* @param string $queue
* @return void
*/
public function pause($connection, $queue)
{
$this->app['cache']
->store()
->forever("illuminate:queue:paused:{$connection}:{$queue}", true);
}

/**
* Resume a paused queue by its connection and name.
*
* @param string $connection
* @param string $queue
* @return void
*/
public function resume($connection, $queue)
{
$this->app['cache']
->store()
->forget("illuminate:queue:paused:{$connection}:{$queue}");
}

/**
* Determine if a queue is paused.
*
* @param string $connection
* @param string $queue
* @return bool
*/
public function isPaused($connection, $queue)
{
return (bool) $this->app['cache']
->store()
->get("illuminate:queue:paused:{$connection}:{$queue}", false);
}

/**
* Add a queue connection resolver.
*
Expand Down
18 changes: 18 additions & 0 deletions src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ protected function getNextJob($connection, $queue)
}

foreach (explode(',', $queue) as $index => $queue) {
if ($this->queuePaused($connection->getConnectionName(), $queue)) {
continue;
}

if (! is_null($job = $popJobCallback($queue, $index))) {
$this->raiseAfterJobPopEvent($connection->getConnectionName(), $job);

Expand All @@ -387,6 +391,20 @@ protected function getNextJob($connection, $queue)
}
}

/**
* Determine if a given connection and queue is paused.
*
* @param string $connectionName
* @param string $queue
* @return bool
*/
protected function queuePaused($connectionName, $queue)
{
return $this->cache && (bool) ($this->cache->get(
"illuminate:queue:paused:{$connectionName}:{$queue}", false
));
}

/**
* Process the given job.
*
Expand Down
3 changes: 3 additions & 0 deletions src/Illuminate/Support/Facades/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
* @method static array pushedJobs()
* @method static array rawPushes()
* @method static \Illuminate\Support\Testing\Fakes\QueueFake serializeAndRestore(bool $serializeAndRestore = true)
* @method static void pause(string $queue, int $ttl = 86400)
* @method static void resume(string $queue)
* @method static bool isPaused(string $queue)
*
* @see \Illuminate\Queue\QueueManager
* @see \Illuminate\Queue\Queue
Expand Down
102 changes: 102 additions & 0 deletions tests/Queue/QueuePauseResumeTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
<?php

namespace Illuminate\Tests\Queue;

use Illuminate\Cache\ArrayStore;
use Illuminate\Cache\Repository;
use Illuminate\Queue\QueueManager;
use Mockery as m;
use PHPUnit\Framework\TestCase;

class QueuePauseResumeTest extends TestCase
{
protected $manager;
protected $cache;

protected function setUp(): void
{
parent::setUp();

$this->cache = new Repository(new ArrayStore);

// Mock the cache facade to return our cache repository
$cacheMock = m::mock();
$cacheMock->shouldReceive('store')->andReturn($this->cache);

$app = [
'config' => [
'queue.default' => 'redis',
'queue.connections.redis' => ['driver' => 'redis'],
'queue.connections.database' => ['driver' => 'database'],
],
'cache' => $cacheMock,
];

$this->manager = new QueueManager($app);
}

protected function tearDown(): void
{
m::close();
parent::tearDown();
}

public function testPauseQueueWithConnection()
{
$this->manager->pause('redis', 'default');

$this->assertTrue($this->manager->isPaused('redis', 'default'));
}

public function testPauseQueueWithTTL()
{
$this->manager->pause('redis', 'default', 60);

$this->assertTrue($this->manager->isPaused('redis', 'default'));
}

public function testPauseQueueIndefinitely()
{
$this->manager->pause('redis', 'default', null);

$this->assertTrue($this->manager->isPaused('redis', 'default'));
}

public function testResumeQueue()
{
$this->manager->pause('redis', 'default');
$this->assertTrue($this->manager->isPaused('redis', 'default'));

$this->manager->resume('redis', 'default');
$this->assertFalse($this->manager->isPaused('redis', 'default'));
}

public function testPausingQueueOnOneConnectionDoesNotAffectAnother()
{
$this->manager->pause('redis', 'default');

$this->assertTrue($this->manager->isPaused('redis', 'default'));
$this->assertFalse($this->manager->isPaused('database', 'default'));
}

public function testPausingDifferentQueuesOnSameConnection()
{
$this->manager->pause('redis', 'emails');
$this->manager->pause('redis', 'notifications');

$this->assertTrue($this->manager->isPaused('redis', 'emails'));
$this->assertTrue($this->manager->isPaused('redis', 'notifications'));
$this->assertFalse($this->manager->isPaused('redis', 'default'));
}

public function testResumingOnlyAffectsSpecificQueue()
{
$this->manager->pause('redis', 'emails');
$this->manager->pause('redis', 'notifications');

$this->manager->resume('redis', 'emails');

$this->assertFalse($this->manager->isPaused('redis', 'emails'));
$this->assertTrue($this->manager->isPaused('redis', 'notifications'));
}
}