Laravel version update

Laravel version update
This commit is contained in:
Manish Verma
2018-08-06 18:48:58 +05:30
parent d143048413
commit 126fbb0255
13678 changed files with 1031482 additions and 778530 deletions

View File

@@ -45,6 +45,19 @@ class BeanstalkdQueue extends Queue implements QueueContract
$this->pheanstalk = $pheanstalk;
}
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
$queue = $this->getQueue($queue);
return (int) $this->pheanstalk->statsTube($queue)->current_jobs_ready;
}
/**
* Push a new job onto the queue.
*
@@ -76,7 +89,7 @@ class BeanstalkdQueue extends Queue implements QueueContract
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
@@ -84,11 +97,14 @@ class BeanstalkdQueue extends Queue implements QueueContract
*/
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue));
return $pheanstalk->put($payload, Pheanstalk::DEFAULT_PRIORITY, $this->getSeconds($delay), $this->timeToRun);
return $pheanstalk->put(
$this->createPayload($job, $data),
Pheanstalk::DEFAULT_PRIORITY,
$this->secondsUntil($delay),
$this->timeToRun
);
}
/**
@@ -104,7 +120,9 @@ class BeanstalkdQueue extends Queue implements QueueContract
$job = $this->pheanstalk->watchOnly($queue)->reserve(0);
if ($job instanceof PheanstalkJob) {
return new BeanstalkdJob($this->container, $this->pheanstalk, $job, $queue);
return new BeanstalkdJob(
$this->container, $this->pheanstalk, $job, $this->connectionName, $queue
);
}
}
@@ -117,7 +135,9 @@ class BeanstalkdQueue extends Queue implements QueueContract
*/
public function deleteMessage($queue, $id)
{
$this->pheanstalk->useTube($this->getQueue($queue))->delete($id);
$queue = $this->getQueue($queue);
$this->pheanstalk->useTube($queue)->delete(new PheanstalkJob($id, ''));
}
/**

View File

@@ -2,8 +2,11 @@
namespace Illuminate\Queue;
use Exception;
use ReflectionClass;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Database\Eloquent\ModelNotFoundException;
class CallQueuedHandler
{
@@ -34,17 +37,45 @@ class CallQueuedHandler
*/
public function call(Job $job, array $data)
{
$command = $this->setJobInstanceIfNecessary(
$job, unserialize($data['command'])
try {
$command = $this->setJobInstanceIfNecessary(
$job, unserialize($data['command'])
);
} catch (ModelNotFoundException $e) {
return $this->handleModelNotFound($job, $e);
}
$this->dispatcher->dispatchNow(
$command, $this->resolveHandler($job, $command)
);
$this->dispatcher->dispatchNow($command);
if (! $job->hasFailed() && ! $job->isReleased()) {
$this->ensureNextJobInChainIsDispatched($command);
}
if (! $job->isDeletedOrReleased()) {
$job->delete();
}
}
/**
* Resolve the handler for the given command.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param mixed $command
* @return mixed
*/
protected function resolveHandler($job, $command)
{
$handler = $this->dispatcher->getCommandHandler($command) ?: null;
if ($handler) {
$this->setJobInstanceIfNecessary($job, $handler);
}
return $handler;
}
/**
* Set the job instance of the given class if necessary.
*
@@ -54,7 +85,7 @@ class CallQueuedHandler
*/
protected function setJobInstanceIfNecessary(Job $job, $instance)
{
if (in_array('Illuminate\Queue\InteractsWithQueue', class_uses_recursive(get_class($instance)))) {
if (in_array(InteractsWithQueue::class, class_uses_recursive(get_class($instance)))) {
$instance->setJob($job);
}
@@ -62,17 +93,60 @@ class CallQueuedHandler
}
/**
* Call the failed method on the job instance.
* Ensure the next job in the chain is dispatched if applicable.
*
* @param array $data
* @param mixed $command
* @return void
*/
public function failed(array $data)
protected function ensureNextJobInChainIsDispatched($command)
{
if (method_exists($command, 'dispatchNextJobInChain')) {
$command->dispatchNextJobInChain();
}
}
/**
* Handle a model not found exception.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return void
*/
protected function handleModelNotFound(Job $job, $e)
{
$class = $job->resolveName();
try {
$shouldDelete = (new ReflectionClass($class))
->getDefaultProperties()['deleteWhenMissingModels'] ?? false;
} catch (Exception $e) {
$shouldDelete = false;
}
if ($shouldDelete) {
return $job->delete();
}
return FailingJob::handle(
$job->getConnectionName(), $job, $e
);
}
/**
* Call the failed method on the job instance.
*
* The exception that caused the failure will be passed.
*
* @param array $data
* @param \Exception $e
* @return void
*/
public function failed(array $data, $e)
{
$command = unserialize($data['command']);
if (method_exists($command, 'failed')) {
$command->failed();
$command->failed($e);
}
}
}

View File

@@ -7,6 +7,10 @@ use Illuminate\Container\Container;
use Illuminate\Queue\QueueServiceProvider;
use Illuminate\Support\Traits\CapsuleManagerTrait;
/**
* @mixin \Illuminate\Queue\QueueManager
* @mixin \Illuminate\Contracts\Queue\Queue
*/
class Manager
{
use CapsuleManagerTrait;
@@ -112,7 +116,7 @@ class Manager
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
@@ -166,7 +170,7 @@ class Manager
*/
public function __call($method, $parameters)
{
return call_user_func_array([$this->manager, $method], $parameters);
return $this->manager->$method(...$parameters);
}
/**
@@ -178,6 +182,6 @@ class Manager
*/
public static function __callStatic($method, $parameters)
{
return call_user_func_array([static::connection(), $method], $parameters);
return static::connection()->$method(...$parameters);
}
}

View File

@@ -2,8 +2,8 @@
namespace Illuminate\Queue\Connectors;
use Pheanstalk\Connection;
use Pheanstalk\Pheanstalk;
use Illuminate\Support\Arr;
use Pheanstalk\PheanstalkInterface;
use Illuminate\Queue\BeanstalkdQueue;
@@ -17,10 +17,24 @@ class BeanstalkdConnector implements ConnectorInterface
*/
public function connect(array $config)
{
$pheanstalk = new Pheanstalk($config['host'], Arr::get($config, 'port', PheanstalkInterface::DEFAULT_PORT));
$retryAfter = $config['retry_after'] ?? Pheanstalk::DEFAULT_TTR;
return new BeanstalkdQueue(
$pheanstalk, $config['queue'], Arr::get($config, 'ttr', Pheanstalk::DEFAULT_TTR)
return new BeanstalkdQueue($this->pheanstalk($config), $config['queue'], $retryAfter);
}
/**
* Create a Pheanstalk instance.
*
* @param array $config
* @return \Pheanstalk\Pheanstalk
*/
protected function pheanstalk(array $config)
{
return new Pheanstalk(
$config['host'],
$config['port'] ?? PheanstalkInterface::DEFAULT_PORT,
$config['timeout'] ?? Connection::DEFAULT_CONNECT_TIMEOUT,
$config['persistent'] ?? false
);
}
}

View File

@@ -2,7 +2,6 @@
namespace Illuminate\Queue\Connectors;
use Illuminate\Support\Arr;
use Illuminate\Queue\DatabaseQueue;
use Illuminate\Database\ConnectionResolverInterface;
@@ -35,10 +34,10 @@ class DatabaseConnector implements ConnectorInterface
public function connect(array $config)
{
return new DatabaseQueue(
$this->connections->connection(Arr::get($config, 'connection')),
$this->connections->connection($config['connection'] ?? null),
$config['table'],
$config['queue'],
Arr::get($config, 'expire', 60)
$config['retry_after'] ?? 60
);
}
}

View File

@@ -2,16 +2,15 @@
namespace Illuminate\Queue\Connectors;
use Illuminate\Support\Arr;
use Illuminate\Redis\Database;
use Illuminate\Queue\RedisQueue;
use Illuminate\Contracts\Redis\Factory as Redis;
class RedisConnector implements ConnectorInterface
{
/**
* The Redis database instance.
*
* @var \Illuminate\Redis\Database
* @var \Illuminate\Contracts\Redis\Factory
*/
protected $redis;
@@ -25,11 +24,11 @@ class RedisConnector implements ConnectorInterface
/**
* Create a new Redis queue connector instance.
*
* @param \Illuminate\Redis\Database $redis
* @param \Illuminate\Contracts\Redis\Factory $redis
* @param string|null $connection
* @return void
*/
public function __construct(Database $redis, $connection = null)
public function __construct(Redis $redis, $connection = null)
{
$this->redis = $redis;
$this->connection = $connection;
@@ -43,12 +42,10 @@ class RedisConnector implements ConnectorInterface
*/
public function connect(array $config)
{
$queue = new RedisQueue(
$this->redis, $config['queue'], Arr::get($config, 'connection', $this->connection)
return new RedisQueue(
$this->redis, $config['queue'],
$config['connection'] ?? $this->connection,
$config['retry_after'] ?? 60
);
$queue->setExpire(Arr::get($config, 'expire', 60));
return $queue;
}
}

View File

@@ -23,7 +23,7 @@ class SqsConnector implements ConnectorInterface
}
return new SqsQueue(
new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '')
new SqsClient($config), $config['queue'], $config['prefix'] ?? ''
);
}

View File

@@ -55,20 +55,14 @@ class FailedTableCommand extends Command
*
* @return void
*/
public function fire()
public function handle()
{
$table = $this->laravel['config']['queue.failed.table'];
$tableClassName = Str::studly($table);
$fullPath = $this->createBaseMigration($table);
$stub = str_replace(
['{{table}}', '{{tableClassName}}'], [$table, $tableClassName], $this->files->get(__DIR__.'/stubs/failed_jobs.stub')
$this->replaceMigration(
$this->createBaseMigration($table), $table, Str::studly($table)
);
$this->files->put($fullPath, $stub);
$this->info('Migration created successfully!');
$this->composer->dumpAutoloads();
@@ -82,10 +76,27 @@ class FailedTableCommand extends Command
*/
protected function createBaseMigration($table = 'failed_jobs')
{
$name = 'create_'.$table.'_table';
return $this->laravel['migration.creator']->create(
'create_'.$table.'_table', $this->laravel->databasePath().'/migrations'
);
}
$path = $this->laravel->databasePath().'/migrations';
/**
* Replace the generated migration with the failed job table stub.
*
* @param string $path
* @param string $table
* @param string $tableClassName
* @return void
*/
protected function replaceMigration($path, $table, $tableClassName)
{
$stub = str_replace(
['{{table}}', '{{tableClassName}}'],
[$table, $tableClassName],
$this->files->get(__DIR__.'/stubs/failed_jobs.stub')
);
return $this->laravel['migration.creator']->create($name, $path);
$this->files->put($path, $stub);
}
}

View File

@@ -25,7 +25,7 @@ class FlushFailedCommand extends Command
*
* @return void
*/
public function fire()
public function handle()
{
$this->laravel['queue.failer']->flush();

View File

@@ -3,16 +3,15 @@
namespace Illuminate\Queue\Console;
use Illuminate\Console\Command;
use Symfony\Component\Console\Input\InputArgument;
class ForgetFailedCommand extends Command
{
/**
* The console command name.
* The console command signature.
*
* @var string
*/
protected $name = 'queue:forget';
protected $signature = 'queue:forget {id : The ID of the failed job.}';
/**
* The console command description.
@@ -26,7 +25,7 @@ class ForgetFailedCommand extends Command
*
* @return void
*/
public function fire()
public function handle()
{
if ($this->laravel['queue.failer']->forget($this->argument('id'))) {
$this->info('Failed job deleted successfully!');
@@ -34,16 +33,4 @@ class ForgetFailedCommand extends Command
$this->error('No failed job matches the given ID.');
}
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['id', InputArgument::REQUIRED, 'The ID of the failed job'],
];
}
}

View File

@@ -33,11 +33,9 @@ class ListFailedCommand extends Command
*
* @return void
*/
public function fire()
public function handle()
{
$jobs = $this->getFailedJobs();
if (count($jobs) == 0) {
if (count($jobs = $this->getFailedJobs()) == 0) {
return $this->info('No failed jobs!');
}
@@ -51,13 +49,11 @@ class ListFailedCommand extends Command
*/
protected function getFailedJobs()
{
$results = [];
$failed = $this->laravel['queue.failer']->all();
foreach ($this->laravel['queue.failer']->all() as $failed) {
$results[] = $this->parseFailedJob((array) $failed);
}
return array_filter($results);
return collect($failed)->map(function ($failed) {
return $this->parseFailedJob((array) $failed);
})->filter()->all();
}
/**
@@ -68,7 +64,7 @@ class ListFailedCommand extends Command
*/
protected function parseFailedJob(array $failed)
{
$row = array_values(Arr::except($failed, ['payload']));
$row = array_values(Arr::except($failed, ['payload', 'exception']));
array_splice($row, 3, 0, $this->extractJobName($failed['payload']));
@@ -86,18 +82,27 @@ class ListFailedCommand extends Command
$payload = json_decode($payload, true);
if ($payload && (! isset($payload['data']['command']))) {
return Arr::get($payload, 'job');
return $payload['job'] ?? null;
} elseif ($payload && isset($payload['data']['command'])) {
return $this->matchJobName($payload);
}
}
/**
* Match the job name from the payload.
*
* @param array $payload
* @return string
*/
protected function matchJobName($payload)
{
preg_match('/"([^"]+)"/', $payload['data']['command'], $matches);
if (isset($matches[1])) {
return $matches[1];
}
if ($payload && isset($payload['data']['command'])) {
preg_match('/"([^"]+)"/', $payload['data']['command'], $matches);
if (isset($matches[1])) {
return $matches[1];
} else {
return Arr::get($payload, 'job');
}
}
return $payload['job'] ?? null;
}
/**

View File

@@ -4,8 +4,7 @@ namespace Illuminate\Queue\Console;
use Illuminate\Queue\Listener;
use Illuminate\Console\Command;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Illuminate\Queue\ListenerOptions;
class ListenCommand extends Command
{
@@ -14,7 +13,15 @@ class ListenCommand extends Command
*
* @var string
*/
protected $name = 'queue:listen';
protected $signature = 'queue:listen
{connection? : The name of connection}
{--delay=0 : The number of seconds to delay failed jobs}
{--force : Force the worker to run even in maintenance mode}
{--memory=128 : The memory limit in megabytes}
{--queue= : The queue to listen on}
{--sleep=3 : Number of seconds to sleep when no job is available}
{--timeout=60 : The number of seconds a child process can run}
{--tries=0 : Number of times to attempt a job before logging it failed}';
/**
* The console command description.
@@ -40,7 +47,7 @@ class ListenCommand extends Command
{
parent::__construct();
$this->listener = $listener;
$this->setOutputHandler($this->listener = $listener);
}
/**
@@ -48,28 +55,17 @@ class ListenCommand extends Command
*
* @return void
*/
public function fire()
public function handle()
{
$this->setListenerOptions();
$delay = $this->input->getOption('delay');
// The memory limit is the amount of memory we will allow the script to occupy
// before killing it and letting a process manager restart it for us, which
// is to protect us against any memory leaks that will be in the scripts.
$memory = $this->input->getOption('memory');
$connection = $this->input->getArgument('connection');
$timeout = $this->input->getOption('timeout');
// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
$queue = $this->getQueue($connection);
$queue = $this->getQueue(
$connection = $this->input->getArgument('connection')
);
$this->listener->listen(
$connection, $queue, $delay, $memory, $timeout
$connection, $queue, $this->gatherOptions()
);
}
@@ -81,64 +77,38 @@ class ListenCommand extends Command
*/
protected function getQueue($connection)
{
if (is_null($connection)) {
$connection = $this->laravel['config']['queue.default'];
}
$connection = $connection ?: $this->laravel['config']['queue.default'];
$queue = $this->laravel['config']->get("queue.connections.{$connection}.queue", 'default');
return $this->input->getOption('queue') ?: $this->laravel['config']->get(
"queue.connections.{$connection}.queue", 'default'
);
}
return $this->input->getOption('queue') ?: $queue;
/**
* Get the listener options for the command.
*
* @return \Illuminate\Queue\ListenerOptions
*/
protected function gatherOptions()
{
return new ListenerOptions(
$this->option('env'), $this->option('delay'),
$this->option('memory'), $this->option('timeout'),
$this->option('sleep'), $this->option('tries'),
$this->option('force')
);
}
/**
* Set the options on the queue listener.
*
* @param \Illuminate\Queue\Listener $listener
* @return void
*/
protected function setListenerOptions()
protected function setOutputHandler(Listener $listener)
{
$this->listener->setEnvironment($this->laravel->environment());
$this->listener->setSleep($this->option('sleep'));
$this->listener->setMaxTries($this->option('tries'));
$this->listener->setOutputHandler(function ($type, $line) {
$listener->setOutputHandler(function ($type, $line) {
$this->output->write($line);
});
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['connection', InputArgument::OPTIONAL, 'The name of connection'],
];
}
/**
* Get the console command options.
*
* @return array
*/
protected function getOptions()
{
return [
['queue', null, InputOption::VALUE_OPTIONAL, 'The queue to listen on', null],
['delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0],
['memory', null, InputOption::VALUE_OPTIONAL, 'The memory limit in megabytes', 128],
['timeout', null, InputOption::VALUE_OPTIONAL, 'Seconds a job may run before timing out', 60],
['sleep', null, InputOption::VALUE_OPTIONAL, 'Seconds to wait before checking queue for jobs', 3],
['tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0],
];
}
}

View File

@@ -3,9 +3,12 @@
namespace Illuminate\Queue\Console;
use Illuminate\Console\Command;
use Illuminate\Support\InteractsWithTime;
class RestartCommand extends Command
{
use InteractsWithTime;
/**
* The console command name.
*
@@ -25,9 +28,9 @@ class RestartCommand extends Command
*
* @return void
*/
public function fire()
public function handle()
{
$this->laravel['cache']->forever('illuminate:queue:restart', time());
$this->laravel['cache']->forever('illuminate:queue:restart', $this->currentTime());
$this->info('Broadcasting queue restart signal.');
}

View File

@@ -4,16 +4,15 @@ namespace Illuminate\Queue\Console;
use Illuminate\Support\Arr;
use Illuminate\Console\Command;
use Symfony\Component\Console\Input\InputArgument;
class RetryCommand extends Command
{
/**
* The console command name.
* The console command signature.
*
* @var string
*/
protected $name = 'queue:retry';
protected $signature = 'queue:retry {id* : The ID of the failed job or "all" to retry all jobs.}';
/**
* The console command description.
@@ -27,48 +26,57 @@ class RetryCommand extends Command
*
* @return void
*/
public function fire()
public function handle()
{
$ids = $this->argument('id');
foreach ($this->getJobIds() as $id) {
$job = $this->laravel['queue.failer']->find($id);
if (is_null($job)) {
$this->error("Unable to find failed job with ID [{$id}].");
} else {
$this->retryJob($job);
$this->info("The failed job [{$id}] has been pushed back onto the queue!");
$this->laravel['queue.failer']->forget($id);
}
}
}
/**
* Get the job IDs to be retried.
*
* @return array
*/
protected function getJobIds()
{
$ids = (array) $this->argument('id');
if (count($ids) === 1 && $ids[0] === 'all') {
$ids = Arr::pluck($this->laravel['queue.failer']->all(), 'id');
}
foreach ($ids as $id) {
$this->retryJob($id);
}
return $ids;
}
/**
* Retry the queue job with the given ID.
* Retry the queue job.
*
* @param string $id
* @param \stdClass $job
* @return void
*/
protected function retryJob($id)
protected function retryJob($job)
{
$failed = $this->laravel['queue.failer']->find($id);
if (! is_null($failed)) {
$failed = (object) $failed;
$failed->payload = $this->resetAttempts($failed->payload);
$this->laravel['queue']->connection($failed->connection)
->pushRaw($failed->payload, $failed->queue);
$this->laravel['queue.failer']->forget($failed->id);
$this->info("The failed job [{$id}] has been pushed back onto the queue!");
} else {
$this->error("No failed job matches the given ID [{$id}].");
}
$this->laravel['queue']->connection($job->connection)->pushRaw(
$this->resetAttempts($job->payload), $job->queue
);
}
/**
* Reset the payload attempts.
*
* Applicable to Redis jobs which store attempts in their payload.
*
* @param string $payload
* @return string
*/
@@ -77,21 +85,9 @@ class RetryCommand extends Command
$payload = json_decode($payload, true);
if (isset($payload['attempts'])) {
$payload['attempts'] = 1;
$payload['attempts'] = 0;
}
return json_encode($payload);
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['id', InputArgument::IS_ARRAY, 'The ID of the failed job'],
];
}
}

View File

@@ -55,20 +55,14 @@ class TableCommand extends Command
*
* @return void
*/
public function fire()
public function handle()
{
$table = $this->laravel['config']['queue.connections.database.table'];
$tableClassName = Str::studly($table);
$fullPath = $this->createBaseMigration($table);
$stub = str_replace(
['{{table}}', '{{tableClassName}}'], [$table, $tableClassName], $this->files->get(__DIR__.'/stubs/jobs.stub')
$this->replaceMigration(
$this->createBaseMigration($table), $table, Str::studly($table)
);
$this->files->put($fullPath, $stub);
$this->info('Migration created successfully!');
$this->composer->dumpAutoloads();
@@ -82,10 +76,27 @@ class TableCommand extends Command
*/
protected function createBaseMigration($table = 'jobs')
{
$name = 'create_'.$table.'_table';
return $this->laravel['migration.creator']->create(
'create_'.$table.'_table', $this->laravel->databasePath().'/migrations'
);
}
$path = $this->laravel->databasePath().'/migrations';
/**
* Replace the generated migration with the job table stub.
*
* @param string $path
* @param string $table
* @param string $tableClassName
* @return void
*/
protected function replaceMigration($path, $table, $tableClassName)
{
$stub = str_replace(
['{{table}}', '{{tableClassName}}'],
[$table, $tableClassName],
$this->files->get(__DIR__.'/stubs/jobs.stub')
);
return $this->laravel['migration.creator']->create($name, $path);
$this->files->put($path, $stub);
}
}

View File

@@ -2,14 +2,14 @@
namespace Illuminate\Queue\Console;
use Carbon\Carbon;
use Illuminate\Queue\Worker;
use Illuminate\Support\Carbon;
use Illuminate\Console\Command;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Queue\WorkerOptions;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Illuminate\Queue\Events\JobProcessing;
class WorkCommand extends Command
{
@@ -18,14 +18,24 @@ class WorkCommand extends Command
*
* @var string
*/
protected $name = 'queue:work';
protected $signature = 'queue:work
{connection? : The name of the queue connection to work}
{--queue= : The names of the queues to work}
{--daemon : Run the worker in daemon mode (Deprecated)}
{--once : Only process the next job on the queue}
{--delay=0 : The number of seconds to delay failed jobs}
{--force : Force the worker to run even in maintenance mode}
{--memory=128 : The memory limit in megabytes}
{--sleep=3 : Number of seconds to sleep when no job is available}
{--timeout=60 : The number of seconds a child process can run}
{--tries=0 : Number of times to attempt a job before logging it failed}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'Process the next job on a queue';
protected $description = 'Start processing jobs on the queue as a daemon';
/**
* The queue worker instance.
@@ -35,7 +45,7 @@ class WorkCommand extends Command
protected $worker;
/**
* Create a new queue listen command.
* Create a new queue work command.
*
* @param \Illuminate\Queue\Worker $worker
* @return void
@@ -52,9 +62,9 @@ class WorkCommand extends Command
*
* @return void
*/
public function fire()
public function handle()
{
if ($this->downForMaintenance() && ! $this->option('daemon')) {
if ($this->downForMaintenance() && $this->option('once')) {
return $this->worker->sleep($this->option('sleep'));
}
@@ -63,19 +73,46 @@ class WorkCommand extends Command
// which jobs are coming through a queue and be informed on its progress.
$this->listenForEvents();
$queue = $this->option('queue');
$connection = $this->argument('connection')
?: $this->laravel['config']['queue.default'];
$delay = $this->option('delay');
// The memory limit is the amount of memory we will allow the script to occupy
// before killing it and letting a process manager restart it for us, which
// is to protect us against any memory leaks that will be in the scripts.
$memory = $this->option('memory');
$connection = $this->argument('connection');
// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
$queue = $this->getQueue($connection);
$this->runWorker(
$connection, $queue, $delay, $memory, $this->option('daemon')
$connection, $queue
);
}
/**
* Run the worker instance.
*
* @param string $connection
* @param string $queue
* @return array
*/
protected function runWorker($connection, $queue)
{
$this->worker->setCache($this->laravel['cache']->driver());
return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
$connection, $queue, $this->gatherWorkerOptions()
);
}
/**
* Gather all of the queue worker options as a single object.
*
* @return \Illuminate\Queue\WorkerOptions
*/
protected function gatherWorkerOptions()
{
return new WorkerOptions(
$this->option('delay'), $this->option('memory'),
$this->option('timeout'), $this->option('sleep'),
$this->option('tries'), $this->option('force')
);
}
@@ -86,62 +123,84 @@ class WorkCommand extends Command
*/
protected function listenForEvents()
{
$this->laravel['events']->listen(JobProcessing::class, function ($event) {
$this->writeOutput($event->job, 'starting');
});
$this->laravel['events']->listen(JobProcessed::class, function ($event) {
$this->writeOutput($event->job, false);
$this->writeOutput($event->job, 'success');
});
$this->laravel['events']->listen(JobFailed::class, function ($event) {
$this->writeOutput($event->job, true);
$this->writeOutput($event->job, 'failed');
$this->logFailedJob($event);
});
}
/**
* Run the worker instance.
*
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $memory
* @param bool $daemon
* @return array
*/
protected function runWorker($connection, $queue, $delay, $memory, $daemon = false)
{
$this->worker->setDaemonExceptionHandler(
$this->laravel['Illuminate\Contracts\Debug\ExceptionHandler']
);
if ($daemon) {
$this->worker->setCache($this->laravel['cache']->driver());
return $this->worker->daemon(
$connection, $queue, $delay, $memory,
$this->option('sleep'), $this->option('tries')
);
}
return $this->worker->pop(
$connection, $queue, $delay,
$this->option('sleep'), $this->option('tries')
);
}
/**
* Write the status output for the queue worker.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param bool $failed
* @param string $status
* @return void
*/
protected function writeOutput(Job $job, $failed)
protected function writeOutput(Job $job, $status)
{
if ($failed) {
$this->output->writeln('<error>['.Carbon::now()->format('Y-m-d H:i:s').'] Failed:</error> '.$job->resolveName());
} else {
$this->output->writeln('<info>['.Carbon::now()->format('Y-m-d H:i:s').'] Processed:</info> '.$job->resolveName());
switch ($status) {
case 'starting':
return $this->writeStatus($job, 'Processing', 'comment');
case 'success':
return $this->writeStatus($job, 'Processed', 'info');
case 'failed':
return $this->writeStatus($job, 'Failed', 'error');
}
}
/**
* Format the status output for the queue worker.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param string $status
* @param string $type
* @return void
*/
protected function writeStatus(Job $job, $status, $type)
{
$this->output->writeln(sprintf(
"<{$type}>[%s] %s</{$type}> %s",
Carbon::now()->format('Y-m-d H:i:s'),
str_pad("{$status}:", 11), $job->resolveName()
));
}
/**
* Store a failed job event.
*
* @param \Illuminate\Queue\Events\JobFailed $event
* @return void
*/
protected function logFailedJob(JobFailed $event)
{
$this->laravel['queue.failer']->log(
$event->connectionName, $event->job->getQueue(),
$event->job->getRawBody(), $event->exception
);
}
/**
* Get the queue name for the worker.
*
* @param string $connection
* @return string
*/
protected function getQueue($connection)
{
return $this->option('queue') ?: $this->laravel['config']->get(
"queue.connections.{$connection}.queue", 'default'
);
}
/**
* Determine if the worker should run in maintenance mode.
*
@@ -149,46 +208,6 @@ class WorkCommand extends Command
*/
protected function downForMaintenance()
{
if ($this->option('force')) {
return false;
}
return $this->laravel->isDownForMaintenance();
}
/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['connection', InputArgument::OPTIONAL, 'The name of connection', null],
];
}
/**
* Get the console command options.
*
* @return array
*/
protected function getOptions()
{
return [
['queue', null, InputOption::VALUE_OPTIONAL, 'The queue to listen on'],
['daemon', null, InputOption::VALUE_NONE, 'Run the worker in daemon mode'],
['delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0],
['force', null, InputOption::VALUE_NONE, 'Force the worker to run even in maintenance mode'],
['memory', null, InputOption::VALUE_OPTIONAL, 'The memory limit in megabytes', 128],
['sleep', null, InputOption::VALUE_OPTIONAL, 'Number of seconds to sleep when no job is available', 3],
['tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0],
];
return $this->option('force') ? false : $this->laravel->isDownForMaintenance();
}
}

View File

@@ -1,5 +1,6 @@
<?php
use Illuminate\Support\Facades\Schema;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Database\Migrations\Migration;
@@ -13,10 +14,11 @@ class Create{{tableClassName}}Table extends Migration
public function up()
{
Schema::create('{{table}}', function (Blueprint $table) {
$table->increments('id');
$table->bigIncrements('id');
$table->text('connection');
$table->text('queue');
$table->longText('payload');
$table->longText('exception');
$table->timestamp('failed_at')->useCurrent();
});
}
@@ -28,6 +30,6 @@ class Create{{tableClassName}}Table extends Migration
*/
public function down()
{
Schema::drop('{{table}}');
Schema::dropIfExists('{{table}}');
}
}

View File

@@ -1,5 +1,6 @@
<?php
use Illuminate\Support\Facades\Schema;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Database\Migrations\Migration;
@@ -14,14 +15,12 @@ class Create{{tableClassName}}Table extends Migration
{
Schema::create('{{table}}', function (Blueprint $table) {
$table->bigIncrements('id');
$table->string('queue');
$table->string('queue')->index();
$table->longText('payload');
$table->tinyInteger('attempts')->unsigned();
$table->tinyInteger('reserved')->unsigned();
$table->unsignedTinyInteger('attempts');
$table->unsignedInteger('reserved_at')->nullable();
$table->unsignedInteger('available_at');
$table->unsignedInteger('created_at');
$table->index(['queue', 'reserved', 'reserved_at']);
});
}
@@ -32,6 +31,6 @@ class Create{{tableClassName}}Table extends Migration
*/
public function down()
{
Schema::drop('{{table}}');
Schema::dropIfExists('{{table}}');
}
}

View File

@@ -1,61 +0,0 @@
<?php
namespace Illuminate\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Console\RetryCommand;
use Illuminate\Queue\Console\ListFailedCommand;
use Illuminate\Queue\Console\FlushFailedCommand;
use Illuminate\Queue\Console\ForgetFailedCommand;
class ConsoleServiceProvider extends ServiceProvider
{
/**
* Indicates if loading of the provider is deferred.
*
* @var bool
*/
protected $defer = true;
/**
* Register the service provider.
*
* @return void
*/
public function register()
{
$this->app->singleton('command.queue.failed', function () {
return new ListFailedCommand;
});
$this->app->singleton('command.queue.retry', function () {
return new RetryCommand;
});
$this->app->singleton('command.queue.forget', function () {
return new ForgetFailedCommand;
});
$this->app->singleton('command.queue.flush', function () {
return new FlushFailedCommand;
});
$this->commands(
'command.queue.failed', 'command.queue.retry',
'command.queue.forget', 'command.queue.flush'
);
}
/**
* Get the services provided by the provider.
*
* @return array
*/
public function provides()
{
return [
'command.queue.failed', 'command.queue.retry',
'command.queue.forget', 'command.queue.flush',
];
}
}

View File

@@ -2,10 +2,11 @@
namespace Illuminate\Queue;
use DateTime;
use Carbon\Carbon;
use Throwable;
use Illuminate\Support\Carbon;
use Illuminate\Database\Connection;
use Illuminate\Queue\Jobs\DatabaseJob;
use Illuminate\Queue\Jobs\DatabaseJobRecord;
use Illuminate\Contracts\Queue\Queue as QueueContract;
class DatabaseQueue extends Queue implements QueueContract
@@ -36,7 +37,7 @@ class DatabaseQueue extends Queue implements QueueContract
*
* @var int|null
*/
protected $expire = 60;
protected $retryAfter = 60;
/**
* Create a new database queue instance.
@@ -44,15 +45,28 @@ class DatabaseQueue extends Queue implements QueueContract
* @param \Illuminate\Database\Connection $database
* @param string $table
* @param string $default
* @param int $expire
* @param int $retryAfter
* @return void
*/
public function __construct(Connection $database, $table, $default = 'default', $expire = 60)
public function __construct(Connection $database, $table, $default = 'default', $retryAfter = 60)
{
$this->table = $table;
$this->expire = $expire;
$this->default = $default;
$this->database = $database;
$this->retryAfter = $retryAfter;
}
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
return $this->database->table($this->table)
->where('queue', $this->getQueue($queue))
->count();
}
/**
@@ -65,7 +79,7 @@ class DatabaseQueue extends Queue implements QueueContract
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data));
return $this->pushToDatabase($queue, $this->createPayload($job, $data));
}
/**
@@ -78,13 +92,13 @@ class DatabaseQueue extends Queue implements QueueContract
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->pushToDatabase(0, $queue, $payload);
return $this->pushToDatabase($queue, $payload);
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
@@ -92,7 +106,7 @@ class DatabaseQueue extends Queue implements QueueContract
*/
public function later($delay, $job, $data = '', $queue = null)
{
return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
return $this->pushToDatabase($queue, $this->createPayload($job, $data), $delay);
}
/**
@@ -107,174 +121,42 @@ class DatabaseQueue extends Queue implements QueueContract
{
$queue = $this->getQueue($queue);
$availableAt = $this->getAvailableAt(0);
$availableAt = $this->availableAt();
$records = array_map(function ($job) use ($queue, $data, $availableAt) {
return $this->buildDatabaseRecord(
$queue, $this->createPayload($job, $data), $availableAt
);
}, (array) $jobs);
return $this->database->table($this->table)->insert($records);
return $this->database->table($this->table)->insert(collect((array) $jobs)->map(
function ($job) use ($queue, $data, $availableAt) {
return $this->buildDatabaseRecord($queue, $this->createPayload($job, $data), $availableAt);
}
)->all());
}
/**
* Release a reserved job back onto the queue.
*
* @param string $queue
* @param \StdClass $job
* @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job
* @param int $delay
* @return mixed
*/
public function release($queue, $job, $delay)
{
return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts);
return $this->pushToDatabase($queue, $job->payload, $delay, $job->attempts);
}
/**
* Push a raw payload to the database with a given delay.
*
* @param \DateTime|int $delay
* @param string|null $queue
* @param string $payload
* @param \DateTimeInterface|\DateInterval|int $delay
* @param int $attempts
* @return mixed
*/
protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
{
$attributes = $this->buildDatabaseRecord(
$this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts
);
return $this->database->table($this->table)->insertGetId($attributes);
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$this->database->beginTransaction();
if ($job = $this->getNextAvailableJob($queue)) {
$job = $this->markJobAsReserved($job);
$this->database->commit();
return new DatabaseJob(
$this->container, $this, $job, $queue
);
}
$this->database->commit();
}
/**
* Get the next available job for the queue.
*
* @param string|null $queue
* @return \StdClass|null
*/
protected function getNextAvailableJob($queue)
{
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where(function ($query) {
$this->isAvailable($query);
$this->isReservedButExpired($query);
})
->orderBy('id', 'asc')
->first();
return $job ? (object) $job : null;
}
/**
* Modify the query to check for available jobs.
*
* @param \Illuminate\Database\Query\Builder $query
* @return void
*/
protected function isAvailable($query)
{
$query->where(function ($query) {
$query->where('reserved', 0);
$query->where('available_at', '<=', $this->getTime());
});
}
/**
* Modify the query to check for jobs that are reserved but have expired.
*
* @param \Illuminate\Database\Query\Builder $query
* @return void
*/
protected function isReservedButExpired($query)
{
$expiration = Carbon::now()->subSeconds($this->expire)->getTimestamp();
$query->orWhere(function ($query) use ($expiration) {
$query->where('reserved', 1);
$query->where('reserved_at', '<=', $expiration);
});
}
/**
* Mark the given job ID as reserved.
*
* @param \stdClass $job
* @return \stdClass
*/
protected function markJobAsReserved($job)
{
$job->reserved = 1;
$job->attempts = $job->attempts + 1;
$job->reserved_at = $this->getTime();
$this->database->table($this->table)->where('id', $job->id)->update([
'reserved' => $job->reserved,
'reserved_at' => $job->reserved_at,
'attempts' => $job->attempts,
]);
return $job;
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param string $id
* @return void
*/
public function deleteReserved($queue, $id)
{
$this->database->beginTransaction();
if ($this->database->table($this->table)->lockForUpdate()->find($id)) {
$this->database->table($this->table)->where('id', $id)->delete();
}
$this->database->commit();
}
/**
* Get the "available at" UNIX timestamp.
*
* @param \DateTime|int $delay
* @return int
*/
protected function getAvailableAt($delay)
{
$availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay);
return $availableAt->getTimestamp();
return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord(
$this->getQueue($queue), $payload, $this->availableAt($delay), $attempts
));
}
/**
@@ -291,21 +173,147 @@ class DatabaseQueue extends Queue implements QueueContract
return [
'queue' => $queue,
'attempts' => $attempts,
'reserved' => 0,
'reserved_at' => null,
'available_at' => $availableAt,
'created_at' => $this->getTime(),
'created_at' => $this->currentTime(),
'payload' => $payload,
];
}
/**
* Pop the next job off of the queue.
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
* @throws \Exception|\Throwable
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
try {
$this->database->beginTransaction();
if ($job = $this->getNextAvailableJob($queue)) {
return $this->marshalJob($queue, $job);
}
$this->database->commit();
} catch (Throwable $e) {
$this->database->rollBack();
throw $e;
}
}
/**
* Get the next available job for the queue.
*
* @param string|null $queue
* @return \Illuminate\Queue\Jobs\DatabaseJobRecord|null
*/
protected function getNextAvailableJob($queue)
{
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where(function ($query) {
$this->isAvailable($query);
$this->isReservedButExpired($query);
})
->orderBy('id', 'asc')
->first();
return $job ? new DatabaseJobRecord((object) $job) : null;
}
/**
* Modify the query to check for available jobs.
*
* @param \Illuminate\Database\Query\Builder $query
* @return void
*/
protected function isAvailable($query)
{
$query->where(function ($query) {
$query->whereNull('reserved_at')
->where('available_at', '<=', $this->currentTime());
});
}
/**
* Modify the query to check for jobs that are reserved but have expired.
*
* @param \Illuminate\Database\Query\Builder $query
* @return void
*/
protected function isReservedButExpired($query)
{
$expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();
$query->orWhere(function ($query) use ($expiration) {
$query->where('reserved_at', '<=', $expiration);
});
}
/**
* Marshal the reserved job into a DatabaseJob instance.
*
* @param string $queue
* @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job
* @return \Illuminate\Queue\Jobs\DatabaseJob
*/
protected function marshalJob($queue, $job)
{
$job = $this->markJobAsReserved($job);
$this->database->commit();
return new DatabaseJob(
$this->container, $this, $job, $this->connectionName, $queue
);
}
/**
* Mark the given job ID as reserved.
*
* @param \Illuminate\Queue\Jobs\DatabaseJobRecord $job
* @return \Illuminate\Queue\Jobs\DatabaseJobRecord
*/
protected function markJobAsReserved($job)
{
$this->database->table($this->table)->where('id', $job->id)->update([
'reserved_at' => $job->touch(),
'attempts' => $job->increment(),
]);
return $job;
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param string $id
* @return void
* @throws \Exception|\Throwable
*/
public function deleteReserved($queue, $id)
{
$this->database->transaction(function () use ($id) {
if ($this->database->table($this->table)->lockForUpdate()->find($id)) {
$this->database->table($this->table)->where('id', $id)->delete();
}
});
}
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
public function getQueue($queue)
{
return $queue ?: $this->default;
}
@@ -319,25 +327,4 @@ class DatabaseQueue extends Queue implements QueueContract
{
return $this->database;
}
/**
* Get the expiration time in seconds.
*
* @return int|null
*/
public function getExpire()
{
return $this->expire;
}
/**
* Set the expiration time in seconds.
*
* @param int|null $seconds
* @return void
*/
public function setExpire($seconds)
{
$this->expire = $seconds;
}
}

View File

@@ -18,17 +18,10 @@ class JobExceptionOccurred
*/
public $job;
/**
* The data given to the job.
*
* @var array
*/
public $data;
/**
* The exception instance.
*
* @var \Throwable
* @var \Exception
*/
public $exception;
@@ -37,14 +30,12 @@ class JobExceptionOccurred
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param array $data
* @param \Throwable $exception
* @param \Exception $exception
* @return void
*/
public function __construct($connectionName, $job, $data, $exception)
public function __construct($connectionName, $job, $exception)
{
$this->job = $job;
$this->data = $data;
$this->exception = $exception;
$this->connectionName = $connectionName;
}

View File

@@ -19,33 +19,24 @@ class JobFailed
public $job;
/**
* The data given to the job.
* The exception that caused the job to fail.
*
* @var array
* @var \Exception
*/
public $data;
/**
* The ID of the entry in the failed jobs table.
*
* @var int|null
*/
public $failedId;
public $exception;
/**
* Create a new event instance.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param array $data
* @param int|null $failedId
* @param \Exception $exception
* @return void
*/
public function __construct($connectionName, $job, $data, $failedId = null)
public function __construct($connectionName, $job, $exception)
{
$this->job = $job;
$this->data = $data;
$this->failedId = $failedId;
$this->exception = $exception;
$this->connectionName = $connectionName;
}
}

View File

@@ -18,25 +18,16 @@ class JobProcessed
*/
public $job;
/**
* The data given to the job.
*
* @var array
*/
public $data;
/**
* Create a new event instance.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param array $data
* @return void
*/
public function __construct($connectionName, $job, $data)
public function __construct($connectionName, $job)
{
$this->job = $job;
$this->data = $data;
$this->connectionName = $connectionName;
}
}

View File

@@ -18,25 +18,16 @@ class JobProcessing
*/
public $job;
/**
* The data given to the job.
*
* @var array
*/
public $data;
/**
* Create a new event instance.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param array $data
* @return void
*/
public function __construct($connectionName, $job, $data)
public function __construct($connectionName, $job)
{
$this->job = $job;
$this->data = $data;
$this->connectionName = $connectionName;
}
}

View File

@@ -0,0 +1,33 @@
<?php
namespace Illuminate\Queue\Events;
class Looping
{
/**
* The connection name.
*
* @var string
*/
public $connectionName;
/**
* The queue name.
*
* @var string
*/
public $queue;
/**
* Create a new event instance.
*
* @param string $connectionName
* @param string $queue
* @return void
*/
public function __construct($connectionName, $queue)
{
$this->queue = $queue;
$this->connectionName = $connectionName;
}
}

View File

@@ -2,7 +2,7 @@
namespace Illuminate\Queue\Failed;
use Carbon\Carbon;
use Illuminate\Support\Carbon;
use Illuminate\Database\ConnectionResolverInterface;
class DatabaseFailedJobProvider implements FailedJobProviderInterface
@@ -49,13 +49,18 @@ class DatabaseFailedJobProvider implements FailedJobProviderInterface
* @param string $connection
* @param string $queue
* @param string $payload
* @param \Exception $exception
* @return int|null
*/
public function log($connection, $queue, $payload)
public function log($connection, $queue, $payload, $exception)
{
$failed_at = Carbon::now();
return $this->getTable()->insertGetId(compact('connection', 'queue', 'payload', 'failed_at'));
$exception = (string) $exception;
return $this->getTable()->insertGetId(compact(
'connection', 'queue', 'payload', 'exception', 'failed_at'
));
}
/**
@@ -65,14 +70,14 @@ class DatabaseFailedJobProvider implements FailedJobProviderInterface
*/
public function all()
{
return $this->getTable()->orderBy('id', 'desc')->get();
return $this->getTable()->orderBy('id', 'desc')->get()->all();
}
/**
* Get a single failed job.
*
* @param mixed $id
* @return array
* @return object|null
*/
public function find($id)
{

View File

@@ -10,9 +10,10 @@ interface FailedJobProviderInterface
* @param string $connection
* @param string $queue
* @param string $payload
* @param \Exception $exception
* @return int|null
*/
public function log($connection, $queue, $payload);
public function log($connection, $queue, $payload, $exception);
/**
* Get a list of all of the failed jobs.
@@ -25,7 +26,7 @@ interface FailedJobProviderInterface
* Get a single failed job.
*
* @param mixed $id
* @return array
* @return object|null
*/
public function find($id);

View File

@@ -10,9 +10,10 @@ class NullFailedJobProvider implements FailedJobProviderInterface
* @param string $connection
* @param string $queue
* @param string $payload
* @param \Exception $exception
* @return int|null
*/
public function log($connection, $queue, $payload)
public function log($connection, $queue, $payload, $exception)
{
//
}
@@ -31,7 +32,7 @@ class NullFailedJobProvider implements FailedJobProviderInterface
* Get a single failed job.
*
* @param mixed $id
* @return array
* @return object|null
*/
public function find($id)
{

View File

@@ -0,0 +1,50 @@
<?php
namespace Illuminate\Queue;
use Illuminate\Container\Container;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Contracts\Events\Dispatcher;
class FailingJob
{
/**
* Delete the job, call the "failed" method, and raise the failed job event.
*
* @param string $connectionName
* @param \Illuminate\Queue\Jobs\Job $job
* @param \Exception $e
* @return void
*/
public static function handle($connectionName, $job, $e = null)
{
$job->markAsFailed();
if ($job->isDeleted()) {
return;
}
try {
// If the job has failed, we will delete it, call the "failed" method and then call
// an event indicating the job has failed so it can be logged if needed. This is
// to allow every developer to better keep monitor of their failed queue jobs.
$job->delete();
$job->failed($e);
} finally {
static::events()->dispatch(new JobFailed(
$connectionName, $job, $e ?: new ManuallyFailedException
));
}
}
/**
* Get the event dispatcher instance.
*
* @return \Illuminate\Contracts\Events\Dispatcher
*/
protected static function events()
{
return Container::getInstance()->make(Dispatcher::class);
}
}

View File

@@ -1,38 +0,0 @@
<?php
use Illuminate\Contracts\Encryption\Encrypter as EncrypterContract;
class IlluminateQueueClosure
{
/**
* The encrypter instance.
*
* @var \Illuminate\Contracts\Encryption\Encrypter
*/
protected $crypt;
/**
* Create a new queued Closure job.
*
* @param \Illuminate\Contracts\Encryption\Encrypter $crypt
* @return void
*/
public function __construct(EncrypterContract $crypt)
{
$this->crypt = $crypt;
}
/**
* Fire the Closure based queue job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param array $data
* @return void
*/
public function fire($job, $data)
{
$closure = unserialize($this->crypt->decrypt($data['closure']));
$closure($job);
}
}

View File

@@ -38,12 +38,13 @@ trait InteractsWithQueue
/**
* Fail the job from the queue.
*
* @param \Throwable $exception
* @return void
*/
public function failed()
public function fail($exception = null)
{
if ($this->job) {
return $this->job->failed();
FailingJob::handle($this->job->getConnectionName(), $this->job, $exception);
}
}

View File

@@ -0,0 +1,19 @@
<?php
namespace Illuminate\Queue;
use InvalidArgumentException;
class InvalidPayloadException extends InvalidArgumentException
{
/**
* Create a new exception instance.
*
* @param string|null $message
* @return void
*/
public function __construct($message = null)
{
parent::__construct($message ?: json_last_error());
}
}

View File

@@ -29,50 +29,17 @@ class BeanstalkdJob extends Job implements JobContract
* @param \Illuminate\Container\Container $container
* @param \Pheanstalk\Pheanstalk $pheanstalk
* @param \Pheanstalk\Job $job
* @param string $connectionName
* @param string $queue
* @return void
*/
public function __construct(Container $container,
Pheanstalk $pheanstalk,
PheanstalkJob $job,
$queue)
public function __construct(Container $container, Pheanstalk $pheanstalk, PheanstalkJob $job, $connectionName, $queue)
{
$this->job = $job;
$this->queue = $queue;
$this->container = $container;
$this->pheanstalk = $pheanstalk;
}
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->getRawBody(), true));
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job->getData();
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->pheanstalk->delete($this->job);
$this->connectionName = $connectionName;
}
/**
@@ -102,6 +69,18 @@ class BeanstalkdJob extends Job implements JobContract
$this->pheanstalk->bury($this->job);
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->pheanstalk->delete($this->job);
}
/**
* Get the number of times the job has been attempted.
*
@@ -117,7 +96,7 @@ class BeanstalkdJob extends Job implements JobContract
/**
* Get the job identifier.
*
* @return string
* @return int
*/
public function getJobId()
{
@@ -125,13 +104,13 @@ class BeanstalkdJob extends Job implements JobContract
}
/**
* Get the IoC container instance.
* Get the raw body string for the job.
*
* @return \Illuminate\Container\Container
* @return string
*/
public function getContainer()
public function getRawBody()
{
return $this->container;
return $this->job->getData();
}
/**

View File

@@ -2,8 +2,8 @@
namespace Illuminate\Queue\Jobs;
use Illuminate\Queue\DatabaseQueue;
use Illuminate\Container\Container;
use Illuminate\Queue\DatabaseQueue;
use Illuminate\Contracts\Queue\Job as JobContract;
class DatabaseJob extends Job implements JobContract
@@ -18,7 +18,7 @@ class DatabaseJob extends Job implements JobContract
/**
* The database job payload.
*
* @var \StdClass
* @var \stdClass
*/
protected $job;
@@ -27,26 +27,33 @@ class DatabaseJob extends Job implements JobContract
*
* @param \Illuminate\Container\Container $container
* @param \Illuminate\Queue\DatabaseQueue $database
* @param \StdClass $job
* @param \stdClass $job
* @param string $connectionName
* @param string $queue
* @return void
*/
public function __construct(Container $container, DatabaseQueue $database, $job, $queue)
public function __construct(Container $container, DatabaseQueue $database, $job, $connectionName, $queue)
{
$this->job = $job;
$this->queue = $queue;
$this->database = $database;
$this->container = $container;
$this->connectionName = $connectionName;
}
/**
* Fire the job.
* Release the job back into the queue.
*
* @return void
* @param int $delay
* @return mixed
*/
public function fire()
public function release($delay = 0)
{
$this->resolveAndFire(json_decode($this->job->payload, true));
parent::release($delay);
$this->delete();
return $this->database->release($this->queue, $this->job, $delay);
}
/**
@@ -61,21 +68,6 @@ class DatabaseJob extends Job implements JobContract
$this->database->deleteReserved($this->queue, $this->job->id);
}
/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0)
{
parent::release($delay);
$this->delete();
$this->database->release($this->queue, $this->job, $delay);
}
/**
* Get the number of times the job has been attempted.
*
@@ -105,34 +97,4 @@ class DatabaseJob extends Job implements JobContract
{
return $this->job->payload;
}
/**
* Get the IoC container instance.
*
* @return \Illuminate\Container\Container
*/
public function getContainer()
{
return $this->container;
}
/**
* Get the underlying queue driver instance.
*
* @return \Illuminate\Queue\DatabaseQueue
*/
public function getDatabaseQueue()
{
return $this->database;
}
/**
* Get the underlying database job.
*
* @return \StdClass
*/
public function getDatabaseJob()
{
return $this->job;
}
}

View File

@@ -0,0 +1,63 @@
<?php
namespace Illuminate\Queue\Jobs;
use Illuminate\Support\InteractsWithTime;
class DatabaseJobRecord
{
use InteractsWithTime;
/**
* The underlying job record.
*
* @var \stdClass
*/
protected $record;
/**
* Create a new job record instance.
*
* @param \stdClass $record
* @return void
*/
public function __construct($record)
{
$this->record = $record;
}
/**
* Increment the number of times the job has been attempted.
*
* @return int
*/
public function increment()
{
$this->record->attempts++;
return $this->record->attempts;
}
/**
* Update the "reserved at" timestamp of the job.
*
* @return int
*/
public function touch()
{
$this->record->reserved_at = $this->currentTime();
return $this->record->reserved_at;
}
/**
* Dynamically access the underlying job information.
*
* @param string $key
* @return mixed
*/
public function __get($key)
{
return $this->record->{$key};
}
}

View File

@@ -2,12 +2,12 @@
namespace Illuminate\Queue\Jobs;
use DateTime;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use Illuminate\Support\InteractsWithTime;
abstract class Job
{
use InteractsWithTime;
/**
* The job handler instance.
*
@@ -22,13 +22,6 @@ abstract class Job
*/
protected $container;
/**
* The name of the queue the job belongs to.
*
* @var string
*/
protected $queue;
/**
* Indicates if the job has been deleted.
*
@@ -43,12 +36,45 @@ abstract class Job
*/
protected $released = false;
/**
* Indicates if the job has failed.
*
* @var bool
*/
protected $failed = false;
/**
* The name of the connection the job belongs to.
*/
protected $connectionName;
/**
* The name of the queue the job belongs to.
*
* @var string
*/
protected $queue;
/**
* Get the raw body of the job.
*
* @return string
*/
abstract public function getRawBody();
/**
* Fire the job.
*
* @return void
*/
abstract public function fire();
public function fire()
{
$payload = $this->payload();
list($class, $method) = JobName::parse($payload['job']);
($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
}
/**
* Delete the job from the queue.
@@ -102,49 +128,46 @@ abstract class Job
}
/**
* Get the number of times the job has been attempted.
* Determine if the job has been marked as a failure.
*
* @return int
* @return bool
*/
abstract public function attempts();
public function hasFailed()
{
return $this->failed;
}
/**
* Get the raw body string for the job.
* Mark the job as "failed".
*
* @return string
*/
abstract public function getRawBody();
/**
* Resolve and fire the job handler method.
*
* @param array $payload
* @return void
*/
protected function resolveAndFire(array $payload)
public function markAsFailed()
{
list($class, $method) = $this->parseJob($payload['job']);
$this->instance = $this->resolve($class);
$this->instance->{$method}($this, $this->resolveQueueableEntities($payload['data']));
$this->failed = true;
}
/**
* Parse the job declaration into class and method.
* Process an exception that caused the job to fail.
*
* @param string $job
* @return array
* @param \Exception $e
* @return void
*/
protected function parseJob($job)
public function failed($e)
{
$segments = explode('@', $job);
$this->markAsFailed();
return count($segments) > 1 ? $segments : [$segments[0], 'fire'];
$payload = $this->payload();
list($class, $method) = JobName::parse($payload['job']);
if (method_exists($this->instance = $this->resolve($class), 'failed')) {
$this->instance->failed($payload['data'], $e);
}
}
/**
* Resolve the given job handler.
* Resolve the given class.
*
* @param string $class
* @return mixed
@@ -155,98 +178,43 @@ abstract class Job
}
/**
* Resolve all of the queueable entities in the given payload.
* Get the decoded body of the job.
*
* @param mixed $data
* @return mixed
* @return array
*/
protected function resolveQueueableEntities($data)
public function payload()
{
if (is_string($data)) {
return $this->resolveQueueableEntity($data);
}
if (is_array($data)) {
$data = array_map(function ($d) {
if (is_array($d)) {
return $this->resolveQueueableEntities($d);
}
return $this->resolveQueueableEntity($d);
}, $data);
}
return $data;
return json_decode($this->getRawBody(), true);
}
/**
* Resolve a single queueable entity from the resolver.
* Get the number of times to attempt a job.
*
* @param mixed $value
* @return \Illuminate\Contracts\Queue\QueueableEntity
* @return int|null
*/
protected function resolveQueueableEntity($value)
public function maxTries()
{
if (is_string($value) && Str::startsWith($value, '::entity::')) {
list($marker, $type, $id) = explode('|', $value, 3);
return $this->getEntityResolver()->resolve($type, $id);
}
return $value;
return $this->payload()['maxTries'] ?? null;
}
/**
* Call the failed method on the job instance.
* Get the number of seconds the job can run.
*
* @return void
* @return int|null
*/
public function failed()
public function timeout()
{
$payload = json_decode($this->getRawBody(), true);
list($class, $method) = $this->parseJob($payload['job']);
$this->instance = $this->resolve($class);
if (method_exists($this->instance, 'failed')) {
$this->instance->failed($this->resolveQueueableEntities($payload['data']));
}
return $this->payload()['timeout'] ?? null;
}
/**
* Get an entity resolver instance.
* Get the timestamp indicating when the job should timeout.
*
* @return \Illuminate\Contracts\Queue\EntityResolver
* @return int|null
*/
protected function getEntityResolver()
public function timeoutAt()
{
return $this->container->make('Illuminate\Contracts\Queue\EntityResolver');
}
/**
* Calculate the number of seconds with the given delay.
*
* @param \DateTime|int $delay
* @return int
*/
protected function getSeconds($delay)
{
if ($delay instanceof DateTime) {
return max(0, $delay->getTimestamp() - $this->getTime());
}
return (int) $delay;
}
/**
* Get the current system time.
*
* @return int
*/
protected function getTime()
{
return time();
return $this->payload()['timeoutAt'] ?? null;
}
/**
@@ -256,29 +224,29 @@ abstract class Job
*/
public function getName()
{
return json_decode($this->getRawBody(), true)['job'];
return $this->payload()['job'];
}
/**
* Get the resolved name of the queued job class.
*
* Resolves the name of "wrapped" jobs such as class-based handlers.
*
* @return string
*/
public function resolveName()
{
$name = $this->getName();
return JobName::resolve($this->getName(), $this->payload());
}
$payload = json_decode($this->getRawBody(), true);
if ($name === 'Illuminate\Queue\CallQueuedHandler@call') {
return Arr::get($payload, 'data.commandName', $name);
}
if ($name === 'Illuminate\Events\CallQueuedHandler@call') {
return $payload['data']['class'].'@'.$payload['data']['method'];
}
return $name;
/**
* Get the name of the connection the job belongs to.
*
* @return string
*/
public function getConnectionName()
{
return $this->connectionName;
}
/**
@@ -290,4 +258,14 @@ abstract class Job
{
return $this->queue;
}
/**
* Get the service container instance.
*
* @return \Illuminate\Container\Container
*/
public function getContainer()
{
return $this->container;
}
}

View File

@@ -0,0 +1,35 @@
<?php
namespace Illuminate\Queue\Jobs;
use Illuminate\Support\Str;
class JobName
{
/**
* Parse the given job name into a class / method array.
*
* @param string $job
* @return array
*/
public static function parse($job)
{
return Str::parseCallback($job, 'fire');
}
/**
* Get the resolved name of the queued job class.
*
* @param string $name
* @param array $payload
* @return string
*/
public static function resolve($name, $payload)
{
if (! empty($payload['displayName'])) {
return $payload['displayName'];
}
return $name;
}
}

View File

@@ -2,7 +2,6 @@
namespace Illuminate\Queue\Jobs;
use Illuminate\Support\Arr;
use Illuminate\Queue\RedisQueue;
use Illuminate\Container\Container;
use Illuminate\Contracts\Queue\Job as JobContract;
@@ -17,37 +16,50 @@ class RedisJob extends Job implements JobContract
protected $redis;
/**
* The Redis job payload.
* The Redis raw job payload.
*
* @var string
*/
protected $job;
/**
* The JSON decoded version of "$job".
*
* @var array
*/
protected $decoded;
/**
* The Redis job payload inside the reserved queue.
*
* @var string
*/
protected $reserved;
/**
* Create a new job instance.
*
* @param \Illuminate\Container\Container $container
* @param \Illuminate\Queue\RedisQueue $redis
* @param string $job
* @param string $reserved
* @param string $connectionName
* @param string $queue
* @return void
*/
public function __construct(Container $container, RedisQueue $redis, $job, $queue)
public function __construct(Container $container, RedisQueue $redis, $job, $reserved, $connectionName, $queue)
{
// The $job variable is the original job JSON as it existed in the ready queue while
// the $reserved variable is the raw JSON in the reserved queue. The exact format
// of the reserved job is required in order for us to properly delete its data.
$this->job = $job;
$this->redis = $redis;
$this->queue = $queue;
$this->reserved = $reserved;
$this->container = $container;
}
$this->connectionName = $connectionName;
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->getRawBody(), true));
$this->decoded = $this->payload();
}
/**
@@ -69,7 +81,7 @@ class RedisJob extends Job implements JobContract
{
parent::delete();
$this->redis->deleteReserved($this->queue, $this->job);
$this->redis->deleteReserved($this->queue, $this);
}
/**
@@ -82,9 +94,7 @@ class RedisJob extends Job implements JobContract
{
parent::release($delay);
$this->delete();
$this->redis->release($this->queue, $this->job, $delay, $this->attempts() + 1);
$this->redis->deleteAndRelease($this->queue, $this, $delay);
}
/**
@@ -94,7 +104,7 @@ class RedisJob extends Job implements JobContract
*/
public function attempts()
{
return Arr::get(json_decode($this->job, true), 'attempts');
return ($this->decoded['attempts'] ?? null) + 1;
}
/**
@@ -104,23 +114,13 @@ class RedisJob extends Job implements JobContract
*/
public function getJobId()
{
return Arr::get(json_decode($this->job, true), 'id');
return $this->decoded['id'] ?? null;
}
/**
* Get the IoC container instance.
* Get the underlying Redis factory implementation.
*
* @return \Illuminate\Container\Container
*/
public function getContainer()
{
return $this->container;
}
/**
* Get the underlying queue driver instance.
*
* @return \Illuminate\Redis\Database
* @return \Illuminate\Contracts\Redis\Factory
*/
public function getRedisQueue()
{
@@ -128,12 +128,12 @@ class RedisJob extends Job implements JobContract
}
/**
* Get the underlying Redis job.
* Get the underlying reserved Redis job.
*
* @return string
*/
public function getRedisJob()
public function getReservedJob()
{
return $this->job;
return $this->reserved;
}
}

View File

@@ -27,55 +27,18 @@ class SqsJob extends Job implements JobContract
*
* @param \Illuminate\Container\Container $container
* @param \Aws\Sqs\SqsClient $sqs
* @param string $queue
* @param array $job
* @param string $connectionName
* @param string $queue
* @return void
*/
public function __construct(Container $container,
SqsClient $sqs,
$queue,
array $job)
public function __construct(Container $container, SqsClient $sqs, array $job, $connectionName, $queue)
{
$this->sqs = $sqs;
$this->job = $job;
$this->queue = $queue;
$this->container = $container;
}
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->getRawBody(), true));
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->job['Body'];
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->sqs->deleteMessage([
'QueueUrl' => $this->queue, 'ReceiptHandle' => $this->job['ReceiptHandle'],
]);
$this->connectionName = $connectionName;
}
/**
@@ -95,6 +58,20 @@ class SqsJob extends Job implements JobContract
]);
}
/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();
$this->sqs->deleteMessage([
'QueueUrl' => $this->queue, 'ReceiptHandle' => $this->job['ReceiptHandle'],
]);
}
/**
* Get the number of times the job has been attempted.
*
@@ -116,13 +93,13 @@ class SqsJob extends Job implements JobContract
}
/**
* Get the IoC container instance.
* Get the raw body string for the job.
*
* @return \Illuminate\Container\Container
* @return string
*/
public function getContainer()
public function getRawBody()
{
return $this->container;
return $this->job['Body'];
}
/**

View File

@@ -26,32 +26,16 @@ class SyncJob extends Job implements JobContract
*
* @param \Illuminate\Container\Container $container
* @param string $payload
* @param string $connectionName
* @param string $queue
* @return void
*/
public function __construct(Container $container, $payload)
public function __construct(Container $container, $payload, $connectionName, $queue)
{
$this->queue = $queue;
$this->payload = $payload;
$this->container = $container;
}
/**
* Fire the job.
*
* @return void
*/
public function fire()
{
$this->resolveAndFire(json_decode($this->payload, true));
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->payload;
$this->connectionName = $connectionName;
}
/**
@@ -84,4 +68,24 @@ class SyncJob extends Job implements JobContract
{
return '';
}
/**
* Get the raw body string for the job.
*
* @return string
*/
public function getRawBody()
{
return $this->payload;
}
/**
* Get the name of the queue the job belongs to.
*
* @return string
*/
public function getQueue()
{
return 'sync';
}
}

View File

@@ -3,8 +3,8 @@
namespace Illuminate\Queue;
use Closure;
use Illuminate\Support\ProcessUtils;
use Symfony\Component\Process\Process;
use Symfony\Component\Process\ProcessUtils;
use Symfony\Component\Process\PhpExecutableFinder;
class Listener
@@ -60,7 +60,7 @@ class Listener
public function __construct($commandPath)
{
$this->commandPath = $commandPath;
$this->workerCommand = $this->buildWorkerCommand();
$this->workerCommand = $this->buildCommandTemplate();
}
/**
@@ -68,23 +68,35 @@ class Listener
*
* @return string
*/
protected function buildWorkerCommand()
protected function buildCommandTemplate()
{
$binary = ProcessUtils::escapeArgument((new PhpExecutableFinder)->find(false));
$command = 'queue:work %s --once --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s';
if (defined('HHVM_VERSION')) {
$binary .= ' --php';
}
return "{$this->phpBinary()} {$this->artisanBinary()} {$command}";
}
if (defined('ARTISAN_BINARY')) {
$artisan = ProcessUtils::escapeArgument(ARTISAN_BINARY);
} else {
$artisan = 'artisan';
}
/**
* Get the PHP binary.
*
* @return string
*/
protected function phpBinary()
{
return ProcessUtils::escapeArgument(
(new PhpExecutableFinder)->find(false)
);
}
$command = 'queue:work %s --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s';
return "{$binary} {$artisan} {$command}";
/**
* Get the Artisan binary.
*
* @return string
*/
protected function artisanBinary()
{
return defined('ARTISAN_BINARY')
? ProcessUtils::escapeArgument(ARTISAN_BINARY)
: 'artisan';
}
/**
@@ -92,20 +104,81 @@ class Listener
*
* @param string $connection
* @param string $queue
* @param string $delay
* @param string $memory
* @param int $timeout
* @param \Illuminate\Queue\ListenerOptions $options
* @return void
*/
public function listen($connection, $queue, $delay, $memory, $timeout = 60)
public function listen($connection, $queue, ListenerOptions $options)
{
$process = $this->makeProcess($connection, $queue, $delay, $memory, $timeout);
$process = $this->makeProcess($connection, $queue, $options);
while (true) {
$this->runProcess($process, $memory);
$this->runProcess($process, $options->memory);
}
}
/**
* Create a new Symfony process for the worker.
*
* @param string $connection
* @param string $queue
* @param \Illuminate\Queue\ListenerOptions $options
* @return \Symfony\Component\Process\Process
*/
public function makeProcess($connection, $queue, ListenerOptions $options)
{
$command = $this->workerCommand;
// If the environment is set, we will append it to the command string so the
// workers will run under the specified environment. Otherwise, they will
// just run under the production environment which is not always right.
if (isset($options->environment)) {
$command = $this->addEnvironment($command, $options);
}
// Next, we will just format out the worker commands with all of the various
// options available for the command. This will produce the final command
// line that we will pass into a Symfony process object for processing.
$command = $this->formatCommand(
$command, $connection, $queue, $options
);
return new Process(
$command, $this->commandPath, null, null, $options->timeout
);
}
/**
* Add the environment option to the given command.
*
* @param string $command
* @param \Illuminate\Queue\ListenerOptions $options
* @return string
*/
protected function addEnvironment($command, ListenerOptions $options)
{
return $command.' --env='.ProcessUtils::escapeArgument($options->environment);
}
/**
* Format the given command with the listener options.
*
* @param string $command
* @param string $connection
* @param string $queue
* @param \Illuminate\Queue\ListenerOptions $options
* @return string
*/
protected function formatCommand($command, $connection, $queue, ListenerOptions $options)
{
return sprintf(
$command,
ProcessUtils::escapeArgument($connection),
ProcessUtils::escapeArgument($queue),
$options->delay, $options->memory,
$options->sleep, $options->maxTries
);
}
/**
* Run the given process.
*
@@ -119,51 +192,14 @@ class Listener
$this->handleWorkerOutput($type, $line);
});
// Once we have run the job we'll go check if the memory limit has been
// exceeded for the script. If it has, we will kill this script so a
// process manager will restart this with a clean slate of memory.
// Once we have run the job we'll go check if the memory limit has been exceeded
// for the script. If it has, we will kill this script so the process manager
// will restart this with a clean slate of memory automatically on exiting.
if ($this->memoryExceeded($memory)) {
$this->stop();
}
}
/**
* Create a new Symfony process for the worker.
*
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $memory
* @param int $timeout
* @return \Symfony\Component\Process\Process
*/
public function makeProcess($connection, $queue, $delay, $memory, $timeout)
{
$string = $this->workerCommand;
// If the environment is set, we will append it to the command string so the
// workers will run under the specified environment. Otherwise, they will
// just run under the production environment which is not always right.
if (isset($this->environment)) {
$string .= ' --env='.ProcessUtils::escapeArgument($this->environment);
}
// Next, we will just format out the worker commands with all of the various
// options available for the command. This will produce the final command
// line that we will pass into a Symfony process object for processing.
$command = sprintf(
$string,
ProcessUtils::escapeArgument($connection),
ProcessUtils::escapeArgument($queue),
$delay,
$memory,
$this->sleep,
$this->maxTries
);
return new Process($command, $this->commandPath, null, null, $timeout);
}
/**
* Handle output from the worker process.
*
@@ -209,57 +245,4 @@ class Listener
{
$this->outputHandler = $outputHandler;
}
/**
* Get the current listener environment.
*
* @return string
*/
public function getEnvironment()
{
return $this->environment;
}
/**
* Set the current environment.
*
* @param string $environment
* @return void
*/
public function setEnvironment($environment)
{
$this->environment = $environment;
}
/**
* Get the amount of seconds to wait before polling the queue.
*
* @return int
*/
public function getSleep()
{
return $this->sleep;
}
/**
* Set the amount of seconds to wait before polling the queue.
*
* @param int $sleep
* @return void
*/
public function setSleep($sleep)
{
$this->sleep = $sleep;
}
/**
* Set the amount of times to try a job before logging it failed.
*
* @param int $tries
* @return void
*/
public function setMaxTries($tries)
{
$this->maxTries = $tries;
}
}

View File

@@ -0,0 +1,32 @@
<?php
namespace Illuminate\Queue;
class ListenerOptions extends WorkerOptions
{
/**
* The environment the worker should run in.
*
* @var string
*/
public $environment;
/**
* Create a new listener options instance.
*
* @param string $environment
* @param int $delay
* @param int $memory
* @param int $timeout
* @param int $sleep
* @param int $maxTries
* @param bool $force
* @return void
*/
public function __construct($environment = null, $delay = 0, $memory = 128, $timeout = 60, $sleep = 3, $maxTries = 0, $force = false)
{
$this->environment = $environment;
parent::__construct($delay, $memory, $timeout, $sleep, $maxTries, $force);
}
}

View File

@@ -0,0 +1,103 @@
<?php
namespace Illuminate\Queue;
class LuaScripts
{
/**
* Get the Lua script for computing the size of queue.
*
* KEYS[1] - The name of the primary queue
* KEYS[2] - The name of the "delayed" queue
* KEYS[3] - The name of the "reserved" queue
*
* @return string
*/
public static function size()
{
return <<<'LUA'
return redis.call('llen', KEYS[1]) + redis.call('zcard', KEYS[2]) + redis.call('zcard', KEYS[3])
LUA;
}
/**
* Get the Lua script for popping the next job off of the queue.
*
* KEYS[1] - The queue to pop jobs from, for example: queues:foo
* KEYS[2] - The queue to place reserved jobs on, for example: queues:foo:reserved
* ARGV[1] - The time at which the reserved job will expire
*
* @return string
*/
public static function pop()
{
return <<<'LUA'
-- Pop the first job off of the queue...
local job = redis.call('lpop', KEYS[1])
local reserved = false
if(job ~= false) then
-- Increment the attempt count and place job on the reserved queue...
reserved = cjson.decode(job)
reserved['attempts'] = reserved['attempts'] + 1
reserved = cjson.encode(reserved)
redis.call('zadd', KEYS[2], ARGV[1], reserved)
end
return {job, reserved}
LUA;
}
/**
* Get the Lua script for releasing reserved jobs.
*
* KEYS[1] - The "delayed" queue we release jobs onto, for example: queues:foo:delayed
* KEYS[2] - The queue the jobs are currently on, for example: queues:foo:reserved
* ARGV[1] - The raw payload of the job to add to the "delayed" queue
* ARGV[2] - The UNIX timestamp at which the job should become available
*
* @return string
*/
public static function release()
{
return <<<'LUA'
-- Remove the job from the current queue...
redis.call('zrem', KEYS[2], ARGV[1])
-- Add the job onto the "delayed" queue...
redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])
return true
LUA;
}
/**
* Get the Lua script to migrate expired jobs back onto the queue.
*
* KEYS[1] - The queue we are removing jobs from, for example: queues:foo:reserved
* KEYS[2] - The queue we are moving jobs to, for example: queues:foo
* ARGV[1] - The current UNIX timestamp
*
* @return string
*/
public static function migrateExpiredJobs()
{
return <<<'LUA'
-- Get all of the jobs with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
end
end
return val
LUA;
}
}

View File

@@ -0,0 +1,10 @@
<?php
namespace Illuminate\Queue;
use RuntimeException;
class ManuallyFailedException extends RuntimeException
{
//
}

View File

@@ -0,0 +1,10 @@
<?php
namespace Illuminate\Queue;
use RuntimeException;
class MaxAttemptsExceededException extends RuntimeException
{
//
}

View File

@@ -6,6 +6,17 @@ use Illuminate\Contracts\Queue\Queue as QueueContract;
class NullQueue extends Queue implements QueueContract
{
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
return 0;
}
/**
* Push a new job onto the queue.
*
@@ -35,7 +46,7 @@ class NullQueue extends Queue implements QueueContract
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string $job
* @param mixed $data
* @param string $queue

View File

@@ -2,17 +2,14 @@
namespace Illuminate\Queue;
use Closure;
use DateTime;
use Exception;
use Illuminate\Support\Arr;
use SuperClosure\Serializer;
use DateTimeInterface;
use Illuminate\Container\Container;
use Illuminate\Contracts\Encryption\Encrypter;
use Illuminate\Contracts\Queue\QueueableEntity;
use Illuminate\Support\InteractsWithTime;
abstract class Queue
{
use InteractsWithTime;
/**
* The IoC container instance.
*
@@ -27,6 +24,13 @@ abstract class Queue
*/
protected $encrypter;
/**
* The connection name for the queue.
*
* @var string
*/
protected $connectionName;
/**
* Push a new job onto the queue.
*
@@ -44,7 +48,7 @@ abstract class Queue
* Push a new job onto the queue after a delay.
*
* @param string $queue
* @param \DateTime|int $delay
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string $job
* @param mixed $data
* @return mixed
@@ -74,129 +78,125 @@ abstract class Queue
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return string
*
* @throws \Illuminate\Queue\InvalidPayloadException
*/
protected function createPayload($job, $data = '', $queue = null)
protected function createPayload($job, $data = '')
{
if ($job instanceof Closure) {
return json_encode($this->createClosurePayload($job, $data));
$payload = json_encode($this->createPayloadArray($job, $data));
if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidPayloadException(
'Unable to JSON encode payload. Error code: '.json_last_error()
);
}
if (is_object($job)) {
return json_encode([
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'data' => ['commandName' => get_class($job), 'command' => serialize(clone $job)],
]);
}
return json_encode($this->createPlainPayload($job, $data));
return $payload;
}
/**
* Create a typical, "plain" queue payload array.
* Create a payload array from the given job and data.
*
* @param string $job
* @param mixed $data
* @return array
*/
protected function createPayloadArray($job, $data = '')
{
return is_object($job)
? $this->createObjectPayload($job)
: $this->createStringPayload($job, $data);
}
/**
* Create a payload for an object-based queue handler.
*
* @param mixed $job
* @return array
*/
protected function createObjectPayload($job)
{
return [
'displayName' => $this->getDisplayName($job),
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => $job->tries ?? null,
'timeout' => $job->timeout ?? null,
'timeoutAt' => $this->getJobExpiration($job),
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
];
}
/**
* Get the display name for the given job.
*
* @param mixed $job
* @return string
*/
protected function getDisplayName($job)
{
return method_exists($job, 'displayName')
? $job->displayName() : get_class($job);
}
/**
* Get the expiration timestamp for an object-based queue handler.
*
* @param mixed $job
* @return mixed
*/
public function getJobExpiration($job)
{
if (! method_exists($job, 'retryUntil') && ! isset($job->timeoutAt)) {
return;
}
$expiration = $job->timeoutAt ?? $job->retryUntil();
return $expiration instanceof DateTimeInterface
? $expiration->getTimestamp() : $expiration;
}
/**
* Create a typical, string based queue payload array.
*
* @param string $job
* @param mixed $data
* @return array
*/
protected function createPlainPayload($job, $data)
protected function createStringPayload($job, $data)
{
return ['job' => $job, 'data' => $this->prepareQueueableEntities($data)];
return [
'displayName' => is_string($job) ? explode('@', $job)[0] : null,
'job' => $job, 'maxTries' => null,
'timeout' => null, 'data' => $data,
];
}
/**
* Prepare any queueable entities for storage in the queue.
* Get the connection name for the queue.
*
* @param mixed $data
* @return mixed
*/
protected function prepareQueueableEntities($data)
{
if ($data instanceof QueueableEntity) {
return $this->prepareQueueableEntity($data);
}
if (is_array($data)) {
$data = array_map(function ($d) {
if (is_array($d)) {
return $this->prepareQueueableEntities($d);
}
return $this->prepareQueueableEntity($d);
}, $data);
}
return $data;
}
/**
* Prepare a single queueable entity for storage on the queue.
*
* @param mixed $value
* @return mixed
*/
protected function prepareQueueableEntity($value)
{
if ($value instanceof QueueableEntity) {
return '::entity::|'.get_class($value).'|'.$value->getQueueableId();
}
return $value;
}
/**
* Create a payload string for the given Closure job.
*
* @param \Closure $job
* @param mixed $data
* @return array
*/
protected function createClosurePayload($job, $data)
{
$closure = $this->getEncrypter()->encrypt((new Serializer)->serialize($job));
return ['job' => 'IlluminateQueueClosure', 'data' => compact('closure')];
}
/**
* Set additional meta on a payload string.
*
* @param string $payload
* @param string $key
* @param string $value
* @return string
*/
protected function setMeta($payload, $key, $value)
public function getConnectionName()
{
$payload = json_decode($payload, true);
return json_encode(Arr::set($payload, $key, $value));
return $this->connectionName;
}
/**
* Calculate the number of seconds with the given delay.
* Set the connection name for the queue.
*
* @param \DateTime|int $delay
* @return int
* @param string $name
* @return $this
*/
protected function getSeconds($delay)
public function setConnectionName($name)
{
if ($delay instanceof DateTime) {
return max(0, $delay->getTimestamp() - $this->getTime());
}
$this->connectionName = $name;
return (int) $delay;
}
/**
* Get the current UNIX timestamp.
*
* @return int
*/
protected function getTime()
{
return time();
return $this;
}
/**
@@ -209,31 +209,4 @@ abstract class Queue
{
$this->container = $container;
}
/**
* Get the encrypter implementation.
*
* @return \Illuminate\Contracts\Encryption\Encrypter
*
* @throws \Exception
*/
protected function getEncrypter()
{
if (is_null($this->encrypter)) {
throw new Exception('No encrypter has been set on the Queue.');
}
return $this->encrypter;
}
/**
* Set the encrypter implementation.
*
* @param \Illuminate\Contracts\Encryption\Encrypter $encrypter
* @return void
*/
public function setEncrypter(Encrypter $encrypter)
{
$this->encrypter = $encrypter;
}
}

View File

@@ -7,6 +7,9 @@ use InvalidArgumentException;
use Illuminate\Contracts\Queue\Factory as FactoryContract;
use Illuminate\Contracts\Queue\Monitor as MonitorContract;
/**
* @mixin \Illuminate\Contracts\Queue\Queue
*/
class QueueManager implements FactoryContract, MonitorContract
{
/**
@@ -82,7 +85,7 @@ class QueueManager implements FactoryContract, MonitorContract
*/
public function looping($callback)
{
$this->app['events']->listen('illuminate.queue.looping', $callback);
$this->app['events']->listen(Events\Looping::class, $callback);
}
/**
@@ -135,8 +138,6 @@ class QueueManager implements FactoryContract, MonitorContract
$this->connections[$name] = $this->resolve($name);
$this->connections[$name]->setContainer($this->app);
$this->connections[$name]->setEncrypter($this->app['encrypter']);
}
return $this->connections[$name];
@@ -152,7 +153,9 @@ class QueueManager implements FactoryContract, MonitorContract
{
$config = $this->getConfig($name);
return $this->getConnector($config['driver'])->connect($config);
return $this->getConnector($config['driver'])
->connect($config)
->setConnectionName($name);
}
/**
@@ -165,11 +168,11 @@ class QueueManager implements FactoryContract, MonitorContract
*/
protected function getConnector($driver)
{
if (isset($this->connectors[$driver])) {
return call_user_func($this->connectors[$driver]);
if (! isset($this->connectors[$driver])) {
throw new InvalidArgumentException("No connector for [$driver]");
}
throw new InvalidArgumentException("No connector for [$driver]");
return call_user_func($this->connectors[$driver]);
}
/**
@@ -204,11 +207,11 @@ class QueueManager implements FactoryContract, MonitorContract
*/
protected function getConfig($name)
{
if ($name === null || $name === 'null') {
return ['driver' => 'null'];
if (! is_null($name) && $name !== 'null') {
return $this->app['config']["queue.connections.{$name}"];
}
return $this->app['config']["queue.connections.{$name}"];
return ['driver' => 'null'];
}
/**
@@ -262,8 +265,6 @@ class QueueManager implements FactoryContract, MonitorContract
*/
public function __call($method, $parameters)
{
$callable = [$this->connection(), $method];
return call_user_func_array($callable, $parameters);
return $this->connection()->$method(...$parameters);
}
}

View File

@@ -2,17 +2,14 @@
namespace Illuminate\Queue;
use IlluminateQueueClosure;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Console\WorkCommand;
use Illuminate\Queue\Console\ListenCommand;
use Illuminate\Queue\Console\RestartCommand;
use Illuminate\Queue\Connectors\SqsConnector;
use Illuminate\Queue\Connectors\NullConnector;
use Illuminate\Queue\Connectors\SyncConnector;
use Illuminate\Queue\Connectors\RedisConnector;
use Illuminate\Queue\Failed\NullFailedJobProvider;
use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Queue\Connectors\DatabaseConnector;
use Illuminate\Queue\Failed\NullFailedJobProvider;
use Illuminate\Queue\Connectors\BeanstalkdConnector;
use Illuminate\Queue\Failed\DatabaseFailedJobProvider;
@@ -34,13 +31,13 @@ class QueueServiceProvider extends ServiceProvider
{
$this->registerManager();
$this->registerConnection();
$this->registerWorker();
$this->registerListener();
$this->registerFailedJobServices();
$this->registerQueueClosure();
}
/**
@@ -54,90 +51,24 @@ class QueueServiceProvider extends ServiceProvider
// Once we have an instance of the queue manager, we will register the various
// resolvers for the queue connectors. These connectors are responsible for
// creating the classes that accept queue configs and instantiate queues.
$manager = new QueueManager($app);
$this->registerConnectors($manager);
return $manager;
return tap(new QueueManager($app), function ($manager) {
$this->registerConnectors($manager);
});
});
}
/**
* Register the default queue connection binding.
*
* @return void
*/
protected function registerConnection()
{
$this->app->singleton('queue.connection', function ($app) {
return $app['queue']->connection();
});
}
/**
* Register the queue worker.
*
* @return void
*/
protected function registerWorker()
{
$this->registerWorkCommand();
$this->registerRestartCommand();
$this->app->singleton('queue.worker', function ($app) {
return new Worker($app['queue'], $app['queue.failer'], $app['events']);
});
}
/**
* Register the queue worker console command.
*
* @return void
*/
protected function registerWorkCommand()
{
$this->app->singleton('command.queue.work', function ($app) {
return new WorkCommand($app['queue.worker']);
});
$this->commands('command.queue.work');
}
/**
* Register the queue listener.
*
* @return void
*/
protected function registerListener()
{
$this->registerListenCommand();
$this->app->singleton('queue.listener', function ($app) {
return new Listener($app->basePath());
});
}
/**
* Register the queue listener console command.
*
* @return void
*/
protected function registerListenCommand()
{
$this->app->singleton('command.queue.listen', function ($app) {
return new ListenCommand($app['queue.listener']);
});
$this->commands('command.queue.listen');
}
/**
* Register the queue restart console command.
*
* @return void
*/
public function registerRestartCommand()
{
$this->app->singleton('command.queue.restart', function () {
return new RestartCommand;
});
$this->commands('command.queue.restart');
}
/**
* Register the connectors on the queue manager.
*
@@ -146,7 +77,7 @@ class QueueServiceProvider extends ServiceProvider
*/
public function registerConnectors($manager)
{
foreach (['Null', 'Sync', 'Database', 'Beanstalkd', 'Redis', 'Sqs'] as $connector) {
foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs'] as $connector) {
$this->{"register{$connector}Connector"}($manager);
}
}
@@ -177,19 +108,6 @@ class QueueServiceProvider extends ServiceProvider
});
}
/**
* Register the Beanstalkd queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerBeanstalkdConnector($manager)
{
$manager->addConnector('beanstalkd', function () {
return new BeanstalkdConnector;
});
}
/**
* Register the database queue connector.
*
@@ -211,10 +129,21 @@ class QueueServiceProvider extends ServiceProvider
*/
protected function registerRedisConnector($manager)
{
$app = $this->app;
$manager->addConnector('redis', function () {
return new RedisConnector($this->app['redis']);
});
}
$manager->addConnector('redis', function () use ($app) {
return new RedisConnector($app['redis']);
/**
* Register the Beanstalkd queue connector.
*
* @param \Illuminate\Queue\QueueManager $manager
* @return void
*/
protected function registerBeanstalkdConnector($manager)
{
$manager->addConnector('beanstalkd', function () {
return new BeanstalkdConnector;
});
}
@@ -231,6 +160,32 @@ class QueueServiceProvider extends ServiceProvider
});
}
/**
* Register the queue worker.
*
* @return void
*/
protected function registerWorker()
{
$this->app->singleton('queue.worker', function () {
return new Worker(
$this->app['queue'], $this->app['events'], $this->app[ExceptionHandler::class]
);
});
}
/**
* Register the queue listener.
*
* @return void
*/
protected function registerListener()
{
$this->app->singleton('queue.listener', function () {
return new Listener($this->app->basePath());
});
}
/**
* Register the failed job services.
*
@@ -238,27 +193,26 @@ class QueueServiceProvider extends ServiceProvider
*/
protected function registerFailedJobServices()
{
$this->app->singleton('queue.failer', function ($app) {
$config = $app['config']['queue.failed'];
$this->app->singleton('queue.failer', function () {
$config = $this->app['config']['queue.failed'];
if (isset($config['table'])) {
return new DatabaseFailedJobProvider($app['db'], $config['database'], $config['table']);
} else {
return new NullFailedJobProvider;
}
return isset($config['table'])
? $this->databaseFailedJobProvider($config)
: new NullFailedJobProvider;
});
}
/**
* Register the Illuminate queued closure job.
* Create a new database failed job provider.
*
* @return void
* @param array $config
* @return \Illuminate\Queue\Failed\DatabaseFailedJobProvider
*/
protected function registerQueueClosure()
protected function databaseFailedJobProvider($config)
{
$this->app->singleton('IlluminateQueueClosure', function ($app) {
return new IlluminateQueueClosure($app['encrypter']);
});
return new DatabaseFailedJobProvider(
$this->app['db'], $config['database'], $config['table']
);
}
/**
@@ -269,9 +223,8 @@ class QueueServiceProvider extends ServiceProvider
public function provides()
{
return [
'queue', 'queue.worker', 'queue.listener', 'queue.failer',
'command.queue.work', 'command.queue.listen',
'command.queue.restart', 'queue.connection',
'queue', 'queue.worker', 'queue.listener',
'queue.failer', 'queue.connection',
];
}
}

View File

@@ -31,4 +31,4 @@ $queue->push('SendEmail', array('message' => $message));
Queue::push('SendEmail', array('message' => $message));
```
For further documentation on using the queue, consult the [Laravel framework documentation](http://laravel.com/docs).
For further documentation on using the queue, consult the [Laravel framework documentation](https://laravel.com/docs).

View File

@@ -2,18 +2,17 @@
namespace Illuminate\Queue;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use Illuminate\Redis\Database;
use Illuminate\Queue\Jobs\RedisJob;
use Illuminate\Contracts\Redis\Factory as Redis;
use Illuminate\Contracts\Queue\Queue as QueueContract;
class RedisQueue extends Queue implements QueueContract
{
/**
* The Redis database instance.
* The Redis factory implementation.
*
* @var \Illuminate\Redis\Database
* @var \Illuminate\Contracts\Redis\Factory
*/
protected $redis;
@@ -36,27 +35,44 @@ class RedisQueue extends Queue implements QueueContract
*
* @var int|null
*/
protected $expire = 60;
protected $retryAfter = 60;
/**
* Create a new Redis queue instance.
*
* @param \Illuminate\Redis\Database $redis
* @param \Illuminate\Contracts\Redis\Factory $redis
* @param string $default
* @param string $connection
* @param int $retryAfter
* @return void
*/
public function __construct(Database $redis, $default = 'default', $connection = null)
public function __construct(Redis $redis, $default = 'default', $connection = null, $retryAfter = 60)
{
$this->redis = $redis;
$this->default = $default;
$this->connection = $connection;
$this->retryAfter = $retryAfter;
}
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
$queue = $this->getQueue($queue);
return $this->getConnection()->eval(
LuaScripts::size(), 3, $queue, $queue.':delayed', $queue.':reserved'
);
}
/**
* Push a new job onto the queue.
*
* @param string $job
* @param object|string $job
* @param mixed $data
* @param string $queue
* @return mixed
@@ -78,43 +94,53 @@ class RedisQueue extends Queue implements QueueContract
{
$this->getConnection()->rpush($this->getQueue($queue), $payload);
return Arr::get(json_decode($payload, true), 'id');
return json_decode($payload, true)['id'] ?? null;
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param \DateTimeInterface|\DateInterval|int $delay
* @param object|string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$delay = $this->getSeconds($delay);
$this->getConnection()->zadd($this->getQueue($queue).':delayed', $this->getTime() + $delay, $payload);
return Arr::get(json_decode($payload, true), 'id');
return $this->laterRaw($delay, $this->createPayload($job, $data), $queue);
}
/**
* Release a reserved job back onto the queue.
* Push a raw job onto the queue after a delay.
*
* @param string $queue
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string $payload
* @param int $delay
* @param int $attempts
* @return void
* @param string $queue
* @return mixed
*/
public function release($queue, $payload, $delay, $attempts)
protected function laterRaw($delay, $payload, $queue = null)
{
$payload = $this->setMeta($payload, 'attempts', $attempts);
$this->getConnection()->zadd(
$this->getQueue($queue).':delayed', $this->availableAt($delay), $payload
);
$this->getConnection()->zadd($this->getQueue($queue).':delayed', $this->getTime() + $delay, $payload);
return json_decode($payload, true)['id'] ?? null;
}
/**
* Create a payload string from the given job and data.
*
* @param string $job
* @param mixed $data
* @return string
*/
protected function createPayloadArray($job, $data = '')
{
return array_merge(parent::createPayloadArray($job, $data), [
'id' => $this->getRandomId(),
'attempts' => 0,
]);
}
/**
@@ -125,46 +151,31 @@ class RedisQueue extends Queue implements QueueContract
*/
public function pop($queue = null)
{
$original = $queue ?: $this->default;
$this->migrate($prefixed = $this->getQueue($queue));
$queue = $this->getQueue($queue);
list($job, $reserved) = $this->retrieveNextJob($prefixed);
if (! is_null($this->expire)) {
$this->migrateAllExpiredJobs($queue);
}
$job = $this->getConnection()->lpop($queue);
if (! is_null($job)) {
$this->getConnection()->zadd($queue.':reserved', $this->getTime() + $this->expire, $job);
return new RedisJob($this->container, $this, $job, $original);
if ($reserved) {
return new RedisJob(
$this->container, $this, $job,
$reserved, $this->connectionName, $queue ?: $this->default
);
}
}
/**
* Delete a reserved job from the queue.
*
* @param string $queue
* @param string $job
* @return void
*/
public function deleteReserved($queue, $job)
{
$this->getConnection()->zrem($this->getQueue($queue).':reserved', $job);
}
/**
* Migrate all of the waiting jobs in the queue.
* Migrate any delayed or expired jobs onto the primary queue.
*
* @param string $queue
* @return void
*/
protected function migrateAllExpiredJobs($queue)
protected function migrate($queue)
{
$this->migrateExpiredJobs($queue.':delayed', $queue);
$this->migrateExpiredJobs($queue.':reserved', $queue);
if (! is_null($this->retryAfter)) {
$this->migrateExpiredJobs($queue.':reserved', $queue);
}
}
/**
@@ -172,87 +183,57 @@ class RedisQueue extends Queue implements QueueContract
*
* @param string $from
* @param string $to
* @return void
* @return array
*/
public function migrateExpiredJobs($from, $to)
{
$options = ['cas' => true, 'watch' => $from, 'retry' => 10];
$this->getConnection()->transaction($options, function ($transaction) use ($from, $to) {
// First we need to get all of jobs that have expired based on the current time
// so that we can push them onto the main queue. After we get them we simply
// remove them from this "delay" queues. All of this within a transaction.
$jobs = $this->getExpiredJobs(
$transaction, $from, $time = $this->getTime()
);
// If we actually found any jobs, we will remove them from the old queue and we
// will insert them onto the new (ready) "queue". This means they will stand
// ready to be processed by the queue worker whenever their turn comes up.
if (count($jobs) > 0) {
$this->removeExpiredJobs($transaction, $from, $time);
$this->pushExpiredJobsOntoNewQueue($transaction, $to, $jobs);
}
});
return $this->getConnection()->eval(
LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->currentTime()
);
}
/**
* Get the expired jobs from a given queue.
* Retrieve the next job from the queue.
*
* @param \Predis\Transaction\MultiExec $transaction
* @param string $from
* @param int $time
* @param string $queue
* @return array
*/
protected function getExpiredJobs($transaction, $from, $time)
protected function retrieveNextJob($queue)
{
return $transaction->zrangebyscore($from, '-inf', $time);
return $this->getConnection()->eval(
LuaScripts::pop(), 2, $queue, $queue.':reserved',
$this->availableAt($this->retryAfter)
);
}
/**
* Remove the expired jobs from a given queue.
* Delete a reserved job from the queue.
*
* @param \Predis\Transaction\MultiExec $transaction
* @param string $from
* @param int $time
* @return void
*/
protected function removeExpiredJobs($transaction, $from, $time)
{
$transaction->multi();
$transaction->zremrangebyscore($from, '-inf', $time);
}
/**
* Push all of the given jobs onto another queue.
*
* @param \Predis\Transaction\MultiExec $transaction
* @param string $to
* @param array $jobs
* @return void
*/
protected function pushExpiredJobsOntoNewQueue($transaction, $to, $jobs)
{
call_user_func_array([$transaction, 'rpush'], array_merge([$to], $jobs));
}
/**
* Create a payload string from the given job and data.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return string
* @param \Illuminate\Queue\Jobs\RedisJob $job
* @return void
*/
protected function createPayload($job, $data = '', $queue = null)
public function deleteReserved($queue, $job)
{
$payload = parent::createPayload($job, $data);
$this->getConnection()->zrem($this->getQueue($queue).':reserved', $job->getReservedJob());
}
$payload = $this->setMeta($payload, 'id', $this->getRandomId());
/**
* Delete a reserved job from the reserved queue and release it.
*
* @param string $queue
* @param \Illuminate\Queue\Jobs\RedisJob $job
* @param int $delay
* @return void
*/
public function deleteAndRelease($queue, $job, $delay)
{
$queue = $this->getQueue($queue);
return $this->setMeta($payload, 'attempts', 1);
$this->getConnection()->eval(
LuaScripts::release(), 2, $queue.':delayed', $queue.':reserved',
$job->getReservedJob(), $this->availableAt($delay)
);
}
/**
@@ -271,7 +252,7 @@ class RedisQueue extends Queue implements QueueContract
* @param string|null $queue
* @return string
*/
protected function getQueue($queue)
public function getQueue($queue)
{
return 'queues:'.($queue ?: $this->default);
}
@@ -279,7 +260,7 @@ class RedisQueue extends Queue implements QueueContract
/**
* Get the connection for the queue.
*
* @return \Predis\ClientInterface
* @return \Illuminate\Redis\Connections\Connection
*/
protected function getConnection()
{
@@ -289,31 +270,10 @@ class RedisQueue extends Queue implements QueueContract
/**
* Get the underlying Redis instance.
*
* @return \Illuminate\Redis\Database
* @return \Illuminate\Contracts\Redis\Factory
*/
public function getRedis()
{
return $this->redis;
}
/**
* Get the expiration time in seconds.
*
* @return int|null
*/
public function getExpire()
{
return $this->expire;
}
/**
* Set the expiration time in seconds.
*
* @param int|null $seconds
* @return void
*/
public function setExpire($seconds)
{
$this->expire = $seconds;
}
}

View File

@@ -0,0 +1,85 @@
<?php
namespace Illuminate\Queue;
use Illuminate\Contracts\Queue\QueueableEntity;
use Illuminate\Contracts\Database\ModelIdentifier;
use Illuminate\Contracts\Queue\QueueableCollection;
use Illuminate\Database\Eloquent\Collection as EloquentCollection;
trait SerializesAndRestoresModelIdentifiers
{
/**
* Get the property value prepared for serialization.
*
* @param mixed $value
* @return mixed
*/
protected function getSerializedPropertyValue($value)
{
if ($value instanceof QueueableCollection) {
return new ModelIdentifier(
$value->getQueueableClass(),
$value->getQueueableIds(),
$value->getQueueableConnection()
);
}
if ($value instanceof QueueableEntity) {
return new ModelIdentifier(
get_class($value),
$value->getQueueableId(),
$value->getQueueableConnection()
);
}
return $value;
}
/**
* Get the restored property value after deserialization.
*
* @param mixed $value
* @return mixed
*/
protected function getRestoredPropertyValue($value)
{
if (! $value instanceof ModelIdentifier) {
return $value;
}
return is_array($value->id)
? $this->restoreCollection($value)
: $this->getQueryForModelRestoration((new $value->class)->setConnection($value->connection), $value->id)
->useWritePdo()->firstOrFail();
}
/**
* Restore a queueable collection instance.
*
* @param \Illuminate\Contracts\Database\ModelIdentifier $value
* @return \Illuminate\Database\Eloquent\Collection
*/
protected function restoreCollection($value)
{
if (! $value->class || count($value->id) === 0) {
return new EloquentCollection;
}
return $this->getQueryForModelRestoration(
(new $value->class)->setConnection($value->connection), $value->id
)->useWritePdo()->get();
}
/**
* Get the query for restoration.
*
* @param \Illuminate\Database\Eloquent\Model $model
* @param array|int $ids
* @return \Illuminate\Database\Eloquent\Builder
*/
protected function getQueryForModelRestoration($model, $ids)
{
return $model->newQueryForRestoration($ids);
}
}

View File

@@ -4,12 +4,11 @@ namespace Illuminate\Queue;
use ReflectionClass;
use ReflectionProperty;
use Illuminate\Contracts\Queue\QueueableEntity;
use Illuminate\Contracts\Database\ModelIdentifier;
use Illuminate\Database\Eloquent\Collection as EloquentCollection;
trait SerializesModels
{
use SerializesAndRestoresModelIdentifiers;
/**
* Prepare the instance for serialization.
*
@@ -44,56 +43,6 @@ trait SerializesModels
}
}
/**
* Get the property value prepared for serialization.
*
* @param mixed $value
* @return mixed
*/
protected function getSerializedPropertyValue($value)
{
if ($value instanceof QueueableEntity) {
return new ModelIdentifier(get_class($value), $value->getQueueableId());
}
return $value;
}
/**
* Get the restored property value after deserialization.
*
* @param mixed $value
* @return mixed
*/
protected function getRestoredPropertyValue($value)
{
if (! $value instanceof ModelIdentifier) {
return $value;
}
return is_array($value->id)
? $this->restoreCollection($value)
: (new $value->class)->newQuery()->useWritePdo()->findOrFail($value->id);
}
/**
* Restore a queueable collection instance.
*
* @param \Illuminate\Contracts\Database\ModelIdentifier $value
* @return \Illuminate\Database\Eloquent\Collection
*/
protected function restoreCollection($value)
{
if (! $value->class || count($value->id) === 0) {
return new EloquentCollection;
}
$model = new $value->class;
return $model->newQuery()->useWritePdo()
->whereIn($model->getKeyName(), $value->id)->get();
}
/**
* Get the property value for the given property.
*

View File

@@ -16,26 +16,19 @@ class SqsQueue extends Queue implements QueueContract
protected $sqs;
/**
* The name of the default tube.
* The name of the default queue.
*
* @var string
*/
protected $default;
/**
* The sqs prefix url.
* The queue URL prefix.
*
* @var string
*/
protected $prefix;
/**
* The job creator callback.
*
* @var callable|null
*/
protected $jobCreator;
/**
* Create a new Amazon SQS queue instance.
*
@@ -51,6 +44,24 @@ class SqsQueue extends Queue implements QueueContract
$this->default = $default;
}
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
$response = $this->sqs->getQueueAttributes([
'QueueUrl' => $this->getQueue($queue),
'AttributeNames' => ['ApproximateNumberOfMessages'],
]);
$attributes = $response->get('Attributes');
return (int) $attributes['ApproximateNumberOfMessages'];
}
/**
* Push a new job onto the queue.
*
@@ -74,15 +85,15 @@ class SqsQueue extends Queue implements QueueContract
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
$response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload]);
return $response->get('MessageId');
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
])->get('MessageId');
}
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
@@ -90,13 +101,10 @@ class SqsQueue extends Queue implements QueueContract
*/
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);
$delay = $this->getSeconds($delay);
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay,
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $this->createPayload($job, $data),
'DelaySeconds' => $this->secondsUntil($delay),
])->get('MessageId');
}
@@ -108,34 +116,19 @@ class SqsQueue extends Queue implements QueueContract
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$response = $this->sqs->receiveMessage([
'QueueUrl' => $queue = $this->getQueue($queue),
'AttributeNames' => ['ApproximateReceiveCount'],
]);
$response = $this->sqs->receiveMessage(
['QueueUrl' => $queue, 'AttributeNames' => ['ApproximateReceiveCount']]
);
if (count($response['Messages']) > 0) {
if ($this->jobCreator) {
return call_user_func($this->jobCreator, $this->container, $this->sqs, $queue, $response);
} else {
return new SqsJob($this->container, $this->sqs, $queue, $response['Messages'][0]);
}
if (! is_null($response['Messages']) && count($response['Messages']) > 0) {
return new SqsJob(
$this->container, $this->sqs, $response['Messages'][0],
$this->connectionName, $queue
);
}
}
/**
* Define the job creator callback for the connection.
*
* @param callable $callback
* @return $this
*/
public function createJobsUsing(callable $callback)
{
$this->jobCreator = $callback;
return $this;
}
/**
* Get the queue or return the default.
*
@@ -146,11 +139,8 @@ class SqsQueue extends Queue implements QueueContract
{
$queue = $queue ?: $this->default;
if (filter_var($queue, FILTER_VALIDATE_URL) !== false) {
return $queue;
}
return rtrim($this->prefix, '/').'/'.($queue);
return filter_var($queue, FILTER_VALIDATE_URL) === false
? rtrim($this->prefix, '/').'/'.$queue : $queue;
}
/**

View File

@@ -7,9 +7,21 @@ use Throwable;
use Illuminate\Queue\Jobs\SyncJob;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Symfony\Component\Debug\Exception\FatalThrowableError;
class SyncQueue extends Queue implements QueueContract
{
/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size($queue = null)
{
return 0;
}
/**
* Push a new job onto the queue.
*
@@ -22,7 +34,7 @@ class SyncQueue extends Queue implements QueueContract
*/
public function push($job, $data = '', $queue = null)
{
$queueJob = $this->resolveJob($this->createPayload($job, $data, $queue));
$queueJob = $this->resolveJob($this->createPayload($job, $data), $queue);
try {
$this->raiseBeforeJobEvent($queueJob);
@@ -31,22 +43,84 @@ class SyncQueue extends Queue implements QueueContract
$this->raiseAfterJobEvent($queueJob);
} catch (Exception $e) {
$this->raiseExceptionOccurredJobEvent($queueJob, $e);
$this->handleFailedJob($queueJob);
throw $e;
$this->handleException($queueJob, $e);
} catch (Throwable $e) {
$this->raiseExceptionOccurredJobEvent($queueJob, $e);
$this->handleFailedJob($queueJob);
throw $e;
$this->handleException($queueJob, new FatalThrowableError($e));
}
return 0;
}
/**
* Resolve a Sync job instance.
*
* @param string $payload
* @param string $queue
* @return \Illuminate\Queue\Jobs\SyncJob
*/
protected function resolveJob($payload, $queue)
{
return new SyncJob($this->container, $payload, $this->connectionName, $queue);
}
/**
* Raise the before queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseBeforeJobEvent(Job $job)
{
if ($this->container->bound('events')) {
$this->container['events']->dispatch(new Events\JobProcessing($this->connectionName, $job));
}
}
/**
* Raise the after queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseAfterJobEvent(Job $job)
{
if ($this->container->bound('events')) {
$this->container['events']->dispatch(new Events\JobProcessed($this->connectionName, $job));
}
}
/**
* Raise the exception occurred queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return void
*/
protected function raiseExceptionOccurredJobEvent(Job $job, $e)
{
if ($this->container->bound('events')) {
$this->container['events']->dispatch(new Events\JobExceptionOccurred($this->connectionName, $job, $e));
}
}
/**
* Handle an exception that occurred while processing a job.
*
* @param \Illuminate\Queue\Jobs\Job $queueJob
* @param \Exception $e
* @return void
*
* @throws \Exception
*/
protected function handleException($queueJob, $e)
{
$this->raiseExceptionOccurredJobEvent($queueJob, $e);
FailingJob::handle($this->connectionName, $queueJob, $e);
throw $e;
}
/**
* Push a raw payload onto the queue.
*
@@ -63,7 +137,7 @@ class SyncQueue extends Queue implements QueueContract
/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string $job
* @param mixed $data
* @param string $queue
@@ -84,89 +158,4 @@ class SyncQueue extends Queue implements QueueContract
{
//
}
/**
* Resolve a Sync job instance.
*
* @param string $payload
* @return \Illuminate\Queue\Jobs\SyncJob
*/
protected function resolveJob($payload)
{
return new SyncJob($this->container, $payload);
}
/**
* Raise the before queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseBeforeJobEvent(Job $job)
{
$data = json_decode($job->getRawBody(), true);
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobProcessing('sync', $job, $data));
}
}
/**
* Raise the after queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseAfterJobEvent(Job $job)
{
$data = json_decode($job->getRawBody(), true);
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobProcessed('sync', $job, $data));
}
}
/**
* Raise the exception occurred queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Throwable $exception
* @return void
*/
protected function raiseExceptionOccurredJobEvent(Job $job, $exception)
{
$data = json_decode($job->getRawBody(), true);
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobExceptionOccurred('sync', $job, $data, $exception));
}
}
/**
* Handle the failed job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return array
*/
protected function handleFailedJob(Job $job)
{
$job->failed();
$this->raiseFailedJobEvent($job);
}
/**
* Raise the failed queue job event.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseFailedJobEvent(Job $job)
{
$data = json_decode($job->getRawBody(), true);
if ($this->container->bound('events')) {
$this->container['events']->fire(new Events\JobFailed('sync', $job, $data));
}
}
}

View File

@@ -4,15 +4,17 @@ namespace Illuminate\Queue;
use Exception;
use Throwable;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Support\Carbon;
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Database\DetectsLostConnections;
use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Queue\Failed\FailedJobProviderInterface;
use Symfony\Component\Debug\Exception\FatalThrowableError;
use Illuminate\Contracts\Cache\Repository as CacheContract;
class Worker
{
use DetectsLostConnections;
/**
* The queue manager instance.
*
@@ -20,13 +22,6 @@ class Worker
*/
protected $manager;
/**
* The failed job provider implementation.
*
* @var \Illuminate\Queue\Failed\FailedJobProviderInterface
*/
protected $failer;
/**
* The event dispatcher instance.
*
@@ -44,25 +39,39 @@ class Worker
/**
* The exception handler instance.
*
* @var \Illuminate\Foundation\Exceptions\Handler
* @var \Illuminate\Contracts\Debug\ExceptionHandler
*/
protected $exceptions;
/**
* Indicates if the worker should exit.
*
* @var bool
*/
public $shouldQuit = false;
/**
* Indicates if the worker is paused.
*
* @var bool
*/
public $paused = false;
/**
* Create a new queue worker.
*
* @param \Illuminate\Queue\QueueManager $manager
* @param \Illuminate\Queue\Failed\FailedJobProviderInterface $failer
* @param \Illuminate\Contracts\Events\Dispatcher $events
* @param \Illuminate\Contracts\Debug\ExceptionHandler $exceptions
* @return void
*/
public function __construct(QueueManager $manager,
FailedJobProviderInterface $failer = null,
Dispatcher $events = null)
Dispatcher $events,
ExceptionHandler $exceptions)
{
$this->failer = $failer;
$this->events = $events;
$this->manager = $manager;
$this->exceptions = $exceptions;
}
/**
@@ -70,101 +79,155 @@ class Worker
*
* @param string $connectionName
* @param string $queue
* @param int $delay
* @param int $memory
* @param int $sleep
* @param int $maxTries
* @return array
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
public function daemon($connectionName, $queue = null, $delay = 0, $memory = 128, $sleep = 3, $maxTries = 0)
public function daemon($connectionName, $queue, WorkerOptions $options)
{
$this->listenForSignals();
$lastRestart = $this->getTimestampOfLastQueueRestart();
while (true) {
if ($this->daemonShouldRun()) {
$this->runNextJobForDaemon(
$connectionName, $queue, $delay, $sleep, $maxTries
);
} else {
$this->sleep($sleep);
// Before reserving any jobs, we will make sure this queue is not paused and
// if it is we will just pause this worker for a given amount of time and
// make sure we do not need to kill this worker process off completely.
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
$this->pauseWorker($options, $lastRestart);
continue;
}
if ($this->memoryExceeded($memory) || $this->queueShouldRestart($lastRestart)) {
$this->stop();
// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
$this->registerTimeoutHandler($job, $options);
// If the daemon should run (not in maintenance mode, etc.), then we can run
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job) {
$this->runJob($job, $connectionName, $options);
} else {
$this->sleep($options->sleep);
}
// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
$this->stopIfNecessary($options, $lastRestart);
}
}
/**
* Run the next job for the daemon worker.
* Register the worker timeout handler (PHP 7.1+).
*
* @param string $connectionName
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function runNextJobForDaemon($connectionName, $queue, $delay, $sleep, $maxTries)
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
try {
$this->pop($connectionName, $queue, $delay, $sleep, $maxTries);
} catch (Exception $e) {
if ($this->exceptions) {
$this->exceptions->report($e);
}
} catch (Throwable $e) {
if ($this->exceptions) {
$this->exceptions->report(new FatalThrowableError($e));
}
if ($this->supportsAsyncSignals()) {
// We will register a signal handler for the alarm signal so that we can kill this
// process if it is running too long because it has frozen. This uses the async
// signals supported in recent versions of PHP to accomplish it conveniently.
pcntl_signal(SIGALRM, function () {
$this->kill(1);
});
pcntl_alarm(
max($this->timeoutForJob($job, $options), 0)
);
}
}
/**
* Get the appropriate timeout for the given job.
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return int
*/
protected function timeoutForJob($job, WorkerOptions $options)
{
return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout;
}
/**
* Determine if the daemon should process on this iteration.
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param string $connectionName
* @param string $queue
* @return bool
*/
protected function daemonShouldRun()
protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
{
return $this->manager->isDownForMaintenance()
? false : $this->events->until('illuminate.queue.looping') !== false;
return ! (($this->manager->isDownForMaintenance() && ! $options->force) ||
$this->paused ||
$this->events->until(new Events\Looping($connectionName, $queue)) === false);
}
/**
* Listen to the given queue.
* Pause the worker for the current loop.
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param int $lastRestart
* @return void
*/
protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
$this->sleep($options->sleep > 0 ? $options->sleep : 1);
$this->stopIfNecessary($options, $lastRestart);
}
/**
* Stop the process if necessary.
*
* @param \Illuminate\Queue\WorkerOptions $options
* @param int $lastRestart
*/
protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
{
if ($this->shouldQuit) {
$this->kill();
}
if ($this->memoryExceeded($options->memory)) {
$this->stop(12);
} elseif ($this->queueShouldRestart($lastRestart)) {
$this->stop();
}
}
/**
* Process the next job on the queue.
*
* @param string $connectionName
* @param string $queue
* @param int $delay
* @param int $sleep
* @param int $maxTries
* @return array
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
public function pop($connectionName, $queue = null, $delay = 0, $sleep = 3, $maxTries = 0)
public function runNextJob($connectionName, $queue, WorkerOptions $options)
{
try {
$connection = $this->manager->connection($connectionName);
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
$job = $this->getNextJob($connection, $queue);
// If we're able to pull a job off of the stack, we will process it and
// then immediately return back out. If there is no job on the queue
// we will "sleep" the worker for the specified number of seconds.
if (! is_null($job)) {
return $this->process(
$this->manager->getName($connectionName), $job, $maxTries, $delay
);
}
} catch (Exception $e) {
if ($this->exceptions) {
$this->exceptions->report($e);
}
// If we're able to pull a job off of the stack, we will process it and then return
// from this method. If there is no job on the queue, we will "sleep" the worker
// for the specified number of seconds, then keep processing jobs after sleep.
if ($job) {
return $this->runJob($job, $connectionName, $options);
}
$this->sleep($sleep);
return ['job' => null, 'failed' => false];
$this->sleep($options->sleep);
}
/**
@@ -176,169 +239,316 @@ class Worker
*/
protected function getNextJob($connection, $queue)
{
if (is_null($queue)) {
return $connection->pop();
}
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
try {
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
}
}
} catch (Exception $e) {
$this->exceptions->report($e);
$this->stopWorkerIfLostConnection($e);
} catch (Throwable $e) {
$this->exceptions->report($e = new FatalThrowableError($e));
$this->stopWorkerIfLostConnection($e);
}
}
/**
* Process a given job from the queue.
* Process the given job.
*
* @param string $connection
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $maxTries
* @param int $delay
* @return array|null
* @param string $connectionName
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function runJob($job, $connectionName, WorkerOptions $options)
{
try {
return $this->process($connectionName, $job, $options);
} catch (Exception $e) {
$this->exceptions->report($e);
$this->stopWorkerIfLostConnection($e);
} catch (Throwable $e) {
$this->exceptions->report($e = new FatalThrowableError($e));
$this->stopWorkerIfLostConnection($e);
}
}
/**
* Stop the worker if we have lost connection to a database.
*
* @param \Exception $e
* @return void
*/
protected function stopWorkerIfLostConnection($e)
{
if ($this->causedByLostConnection($e)) {
$this->shouldQuit = true;
}
}
/**
* Process the given job from the queue.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*
* @throws \Throwable
*/
public function process($connection, Job $job, $maxTries = 0, $delay = 0)
public function process($connectionName, $job, WorkerOptions $options)
{
if ($maxTries > 0 && $job->attempts() > $maxTries) {
return $this->logFailedJob($connection, $job);
}
try {
$this->raiseBeforeJobEvent($connection, $job);
// First we will raise the before job event and determine if the job has already ran
// over its maximum attempt limits, which could primarily happen when this job is
// continually timing out and not actually throwing any exceptions from itself.
$this->raiseBeforeJobEvent($connectionName, $job);
// First we will fire off the job. Once it is done we will see if it will be
// automatically deleted after processing and if so we'll fire the delete
// method on the job. Otherwise, we will just keep on running our jobs.
$this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
$connectionName, $job, (int) $options->maxTries
);
// Here we will fire off the job and let it process. We will catch any exceptions so
// they can be reported to the developers logs, etc. Once the job is finished the
// proper events will be fired to let any listeners know this job has finished.
$job->fire();
$this->raiseAfterJobEvent($connection, $job);
return ['job' => $job, 'failed' => false];
$this->raiseAfterJobEvent($connectionName, $job);
} catch (Exception $e) {
$this->handleJobException($connection, $job, $delay, $e);
$this->handleJobException($connectionName, $job, $options, $e);
} catch (Throwable $e) {
$this->handleJobException($connection, $job, $delay, $e);
$this->handleJobException(
$connectionName, $job, $options, new FatalThrowableError($e)
);
}
}
/**
* Handle an exception that occurred while the job was running.
*
* @param string $connection
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $delay
* @param \Throwable $e
* @param \Illuminate\Queue\WorkerOptions $options
* @param \Exception $e
* @return void
*
* @throws \Throwable
* @throws \Exception
*/
protected function handleJobException($connection, Job $job, $delay, $e)
protected function handleJobException($connectionName, $job, WorkerOptions $options, $e)
{
// If we catch an exception, we will attempt to release the job back onto
// the queue so it is not lost. This will let is be retried at a later
// time by another listener (or the same one). We will do that here.
try {
// First, we will go ahead and mark the job as failed if it will exceed the maximum
// attempts it is allowed to run the next time we process it. If so we will just
// go ahead and mark it as failed now so we do not have to release this again.
if (! $job->hasFailed()) {
$this->markJobAsFailedIfWillExceedMaxAttempts(
$connectionName, $job, (int) $options->maxTries, $e
);
}
$this->raiseExceptionOccurredJobEvent(
$connection, $job, $e
$connectionName, $job, $e
);
} finally {
if (! $job->isDeleted()) {
$job->release($delay);
// If we catch an exception, we will attempt to release the job back onto the queue
// so it is not lost entirely. This'll let the job be retried at a later time by
// another listener (or this same one). We will re-throw this exception after.
if (! $job->isDeleted() && ! $job->isReleased() && ! $job->hasFailed()) {
$job->release($options->delay);
}
}
throw $e;
}
/**
* Mark the given job as failed if it has exceeded the maximum allowed attempts.
*
* This will likely be because the job previously exceeded a timeout.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $maxTries
* @return void
*/
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
$timeoutAt = $job->timeoutAt();
if ($timeoutAt && Carbon::now()->getTimestamp() <= $timeoutAt) {
return;
}
if (! $timeoutAt && ($maxTries === 0 || $job->attempts() <= $maxTries)) {
return;
}
$this->failJob($connectionName, $job, $e = new MaxAttemptsExceededException(
$job->resolveName().' has been attempted too many times or run too long. The job may have previously timed out.'
));
throw $e;
}
/**
* Mark the given job as failed if it has exceeded the maximum allowed attempts.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $maxTries
* @param \Exception $e
* @return void
*/
protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
{
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) {
$this->failJob($connectionName, $job, $e);
}
if ($maxTries > 0 && $job->attempts() >= $maxTries) {
$this->failJob($connectionName, $job, $e);
}
}
/**
* Mark the given job as failed and raise the relevant event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return void
*/
protected function failJob($connectionName, $job, $e)
{
return FailingJob::handle($connectionName, $job, $e);
}
/**
* Raise the before queue job event.
*
* @param string $connection
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseBeforeJobEvent($connection, Job $job)
protected function raiseBeforeJobEvent($connectionName, $job)
{
if ($this->events) {
$data = json_decode($job->getRawBody(), true);
$this->events->fire(new Events\JobProcessing($connection, $job, $data));
}
$this->events->dispatch(new Events\JobProcessing(
$connectionName, $job
));
}
/**
* Raise the after queue job event.
*
* @param string $connection
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @return void
*/
protected function raiseAfterJobEvent($connection, Job $job)
protected function raiseAfterJobEvent($connectionName, $job)
{
if ($this->events) {
$data = json_decode($job->getRawBody(), true);
$this->events->fire(new Events\JobProcessed($connection, $job, $data));
}
$this->events->dispatch(new Events\JobProcessed(
$connectionName, $job
));
}
/**
* Raise the exception occurred queue job event.
*
* @param string $connection
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Throwable $exception
* @param \Exception $e
* @return void
*/
protected function raiseExceptionOccurredJobEvent($connection, Job $job, $exception)
protected function raiseExceptionOccurredJobEvent($connectionName, $job, $e)
{
if ($this->events) {
$data = json_decode($job->getRawBody(), true);
$this->events->fire(new Events\JobExceptionOccurred($connection, $job, $data, $exception));
}
}
/**
* Log a failed job into storage.
*
* @param string $connection
* @param \Illuminate\Contracts\Queue\Job $job
* @return array
*/
protected function logFailedJob($connection, Job $job)
{
if ($this->failer) {
$failedId = $this->failer->log($connection, $job->getQueue(), $job->getRawBody());
$job->delete();
$job->failed();
$this->raiseFailedJobEvent($connection, $job, $failedId);
}
return ['job' => $job, 'failed' => true];
$this->events->dispatch(new Events\JobExceptionOccurred(
$connectionName, $job, $e
));
}
/**
* Raise the failed queue job event.
*
* @param string $connection
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param int|null $failedId
* @param \Exception $e
* @return void
*/
protected function raiseFailedJobEvent($connection, Job $job, $failedId)
protected function raiseFailedJobEvent($connectionName, $job, $e)
{
if ($this->events) {
$data = json_decode($job->getRawBody(), true);
$this->events->dispatch(new Events\JobFailed(
$connectionName, $job, $e
));
}
$this->events->fire(new Events\JobFailed($connection, $job, $data, $failedId));
/**
* Determine if the queue worker should restart.
*
* @param int|null $lastRestart
* @return bool
*/
protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
/**
* Get the last queue restart timestamp, or null.
*
* @return int|null
*/
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
}
}
/**
* Enable async signals for the process.
*
* @return void
*/
protected function listenForSignals()
{
if ($this->supportsAsyncSignals()) {
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () {
$this->shouldQuit = true;
});
pcntl_signal(SIGUSR2, function () {
$this->paused = true;
});
pcntl_signal(SIGCONT, function () {
$this->paused = false;
});
}
}
/**
* Determine if "async" signals are supported.
*
* @return bool
*/
protected function supportsAsyncSignals()
{
return version_compare(PHP_VERSION, '7.1.0') >= 0 &&
extension_loaded('pcntl');
}
/**
* Determine if the memory limit has been exceeded.
*
@@ -353,60 +563,48 @@ class Worker
/**
* Stop listening and bail out of the script.
*
* @param int $status
* @return void
*/
public function stop()
public function stop($status = 0)
{
$this->events->fire(new Events\WorkerStopping);
$this->events->dispatch(new Events\WorkerStopping);
die;
exit($status);
}
/**
* Kill the process.
*
* @param int $status
* @return void
*/
public function kill($status = 0)
{
$this->events->dispatch(new Events\WorkerStopping);
if (extension_loaded('posix')) {
posix_kill(getmypid(), SIGKILL);
}
exit($status);
}
/**
* Sleep the script for a given number of seconds.
*
* @param int $seconds
* @param int|float $seconds
* @return void
*/
public function sleep($seconds)
{
sleep($seconds);
}
/**
* Get the last queue restart timestamp, or null.
*
* @return int|null
*/
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
if ($seconds < 1) {
usleep($seconds * 1000000);
} else {
sleep($seconds);
}
}
/**
* Determine if the queue worker should restart.
*
* @param int|null $lastRestart
* @return bool
*/
protected function queueShouldRestart($lastRestart)
{
return $this->getTimestampOfLastQueueRestart() != $lastRestart;
}
/**
* Set the exception handler to use in Daemon mode.
*
* @param \Illuminate\Contracts\Debug\ExceptionHandler $handler
* @return void
*/
public function setDaemonExceptionHandler(ExceptionHandler $handler)
{
$this->exceptions = $handler;
}
/**
* Set the cache repository implementation.
*

View File

@@ -0,0 +1,69 @@
<?php
namespace Illuminate\Queue;
class WorkerOptions
{
/**
* The number of seconds before a released job will be available.
*
* @var int
*/
public $delay;
/**
* The maximum amount of RAM the worker may consume.
*
* @var int
*/
public $memory;
/**
* The maximum number of seconds a child worker may run.
*
* @var int
*/
public $timeout;
/**
* The number of seconds to wait in between polling the queue.
*
* @var int
*/
public $sleep;
/**
* The maximum amount of times a job may be attempted.
*
* @var int
*/
public $maxTries;
/**
* Indicates if the worker should run in maintenance mode.
*
* @var bool
*/
public $force;
/**
* Create a new worker options instance.
*
* @param int $delay
* @param int $memory
* @param int $timeout
* @param int $sleep
* @param int $maxTries
* @param bool $force
* @return void
*/
public function __construct($delay = 0, $memory = 128, $timeout = 60, $sleep = 3, $maxTries = 0, $force = false)
{
$this->delay = $delay;
$this->sleep = $sleep;
$this->force = $force;
$this->memory = $memory;
$this->timeout = $timeout;
$this->maxTries = $maxTries;
}
}

View File

@@ -2,7 +2,7 @@
"name": "illuminate/queue",
"description": "The Illuminate Queue package.",
"license": "MIT",
"homepage": "http://laravel.com",
"homepage": "https://laravel.com",
"support": {
"issues": "https://github.com/laravel/framework/issues",
"source": "https://github.com/laravel/framework"
@@ -14,32 +14,35 @@
}
],
"require": {
"php": ">=5.5.9",
"illuminate/console": "5.2.*",
"illuminate/container": "5.2.*",
"illuminate/contracts": "5.2.*",
"illuminate/support": "5.2.*",
"nesbot/carbon": "~1.20",
"symfony/debug": "2.8.*|3.0.*",
"symfony/process": "2.8.*|3.0.*"
"php": ">=7.0",
"illuminate/console": "5.5.*",
"illuminate/container": "5.5.*",
"illuminate/contracts": "5.5.*",
"illuminate/database": "5.5.*",
"illuminate/filesystem": "5.5.*",
"illuminate/support": "5.5.*",
"symfony/debug": "~3.3",
"symfony/process": "~3.3"
},
"autoload": {
"psr-4": {
"Illuminate\\Queue\\": ""
},
"classmap": [
"IlluminateQueueClosure.php"
]
}
},
"extra": {
"branch-alias": {
"dev-master": "5.2-dev"
"dev-master": "5.5-dev"
}
},
"suggest": {
"ext-pcntl": "Required to use all features of the queue worker.",
"ext-posix": "Required to use all features of the queue worker.",
"aws/aws-sdk-php": "Required to use the SQS queue driver (~3.0).",
"illuminate/redis": "Required to use the Redis queue driver (5.2.*).",
"illuminate/redis": "Required to use the Redis queue driver (5.5.*).",
"pda/pheanstalk": "Required to use the Beanstalk queue driver (~3.0)."
},
"config": {
"sort-packages": true
},
"minimum-stability": "dev"
}