update 1.0.8.0
Commits for version update
This commit is contained in:
@@ -4,10 +4,8 @@ namespace Illuminate\Queue;
|
||||
|
||||
use DateTime;
|
||||
use Carbon\Carbon;
|
||||
use Illuminate\Support\Collection;
|
||||
use Illuminate\Database\Connection;
|
||||
use Illuminate\Queue\Jobs\DatabaseJob;
|
||||
use Illuminate\Database\Query\Expression;
|
||||
use Illuminate\Contracts\Queue\Queue as QueueContract;
|
||||
|
||||
class DatabaseQueue extends Queue implements QueueContract
|
||||
@@ -161,14 +159,10 @@ class DatabaseQueue extends Queue implements QueueContract
|
||||
{
|
||||
$queue = $this->getQueue($queue);
|
||||
|
||||
if (! is_null($this->expire)) {
|
||||
$this->releaseJobsThatHaveBeenReservedTooLong($queue);
|
||||
}
|
||||
|
||||
$this->database->beginTransaction();
|
||||
|
||||
if ($job = $this->getNextAvailableJob($queue)) {
|
||||
$this->markJobAsReserved($job->id);
|
||||
$job = $this->markJobAsReserved($job);
|
||||
|
||||
$this->database->commit();
|
||||
|
||||
@@ -180,38 +174,6 @@ class DatabaseQueue extends Queue implements QueueContract
|
||||
$this->database->commit();
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the jobs that have been reserved for too long.
|
||||
*
|
||||
* @param string $queue
|
||||
* @return void
|
||||
*/
|
||||
protected function releaseJobsThatHaveBeenReservedTooLong($queue)
|
||||
{
|
||||
if (random_int(1, 10) < 10) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->database->beginTransaction();
|
||||
|
||||
$stale = $this->database->table($this->table)
|
||||
->lockForUpdate()
|
||||
->where('queue', $this->getQueue($queue))
|
||||
->where('reserved', 1)
|
||||
->where('reserved_at', '<=', Carbon::now()->subSeconds($this->expire)->getTimestamp())
|
||||
->get();
|
||||
|
||||
$this->database->table($this->table)
|
||||
->whereIn('id', Collection::make($stale)->pluck('id')->all())
|
||||
->update([
|
||||
'reserved' => 0,
|
||||
'reserved_at' => null,
|
||||
'attempts' => new Expression('attempts + 1'),
|
||||
]);
|
||||
|
||||
$this->database->commit();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next available job for the queue.
|
||||
*
|
||||
@@ -223,8 +185,10 @@ class DatabaseQueue extends Queue implements QueueContract
|
||||
$job = $this->database->table($this->table)
|
||||
->lockForUpdate()
|
||||
->where('queue', $this->getQueue($queue))
|
||||
->where('reserved', 0)
|
||||
->where('available_at', '<=', $this->getTime())
|
||||
->where(function ($query) {
|
||||
$this->isAvailable($query);
|
||||
$this->isReservedButExpired($query);
|
||||
})
|
||||
->orderBy('id', 'asc')
|
||||
->first();
|
||||
|
||||
@@ -232,16 +196,54 @@ class DatabaseQueue extends Queue implements QueueContract
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark the given job ID as reserved.
|
||||
* Modify the query to check for available jobs.
|
||||
*
|
||||
* @param string $id
|
||||
* @param \Illuminate\Database\Query\Builder $query
|
||||
* @return void
|
||||
*/
|
||||
protected function markJobAsReserved($id)
|
||||
protected function isAvailable($query)
|
||||
{
|
||||
$this->database->table($this->table)->where('id', $id)->update([
|
||||
'reserved' => 1, 'reserved_at' => $this->getTime(),
|
||||
$query->where(function ($query) {
|
||||
$query->where('reserved', 0);
|
||||
$query->where('available_at', '<=', $this->getTime());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Modify the query to check for jobs that are reserved but have expired.
|
||||
*
|
||||
* @param \Illuminate\Database\Query\Builder $query
|
||||
* @return void
|
||||
*/
|
||||
protected function isReservedButExpired($query)
|
||||
{
|
||||
$expiration = Carbon::now()->subSeconds($this->expire)->getTimestamp();
|
||||
|
||||
$query->orWhere(function ($query) use ($expiration) {
|
||||
$query->where('reserved', 1);
|
||||
$query->where('reserved_at', '<=', $expiration);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark the given job ID as reserved.
|
||||
*
|
||||
* @param \stdClass $job
|
||||
* @return \stdClass
|
||||
*/
|
||||
protected function markJobAsReserved($job)
|
||||
{
|
||||
$job->reserved = 1;
|
||||
$job->attempts = $job->attempts + 1;
|
||||
$job->reserved_at = $this->getTime();
|
||||
|
||||
$this->database->table($this->table)->where('id', $job->id)->update([
|
||||
'reserved' => $job->reserved,
|
||||
'reserved_at' => $job->reserved_at,
|
||||
'attempts' => $job->attempts,
|
||||
]);
|
||||
|
||||
return $job;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -37,7 +37,6 @@ class DatabaseJob extends Job implements JobContract
|
||||
$this->queue = $queue;
|
||||
$this->database = $database;
|
||||
$this->container = $container;
|
||||
$this->job->attempts = $this->job->attempts + 1;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -4,11 +4,12 @@ namespace Illuminate\Queue;
|
||||
|
||||
use Closure;
|
||||
use DateTime;
|
||||
use Exception;
|
||||
use Illuminate\Support\Arr;
|
||||
use SuperClosure\Serializer;
|
||||
use Illuminate\Container\Container;
|
||||
use Illuminate\Contracts\Encryption\Encrypter;
|
||||
use Illuminate\Contracts\Queue\QueueableEntity;
|
||||
use Illuminate\Contracts\Encryption\Encrypter as EncrypterContract;
|
||||
|
||||
abstract class Queue
|
||||
{
|
||||
@@ -19,6 +20,13 @@ abstract class Queue
|
||||
*/
|
||||
protected $container;
|
||||
|
||||
/**
|
||||
* The encrypter implementation.
|
||||
*
|
||||
* @var \Illuminate\Contracts\Encryption\Encrypter
|
||||
*/
|
||||
protected $encrypter;
|
||||
|
||||
/**
|
||||
* Push a new job onto the queue.
|
||||
*
|
||||
@@ -73,7 +81,9 @@ abstract class Queue
|
||||
{
|
||||
if ($job instanceof Closure) {
|
||||
return json_encode($this->createClosurePayload($job, $data));
|
||||
} elseif (is_object($job)) {
|
||||
}
|
||||
|
||||
if (is_object($job)) {
|
||||
return json_encode([
|
||||
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
|
||||
'data' => ['commandName' => get_class($job), 'command' => serialize(clone $job)],
|
||||
@@ -144,7 +154,7 @@ abstract class Queue
|
||||
*/
|
||||
protected function createClosurePayload($job, $data)
|
||||
{
|
||||
$closure = $this->crypt->encrypt((new Serializer)->serialize($job));
|
||||
$closure = $this->getEncrypter()->encrypt((new Serializer)->serialize($job));
|
||||
|
||||
return ['job' => 'IlluminateQueueClosure', 'data' => compact('closure')];
|
||||
}
|
||||
@@ -201,13 +211,29 @@ abstract class Queue
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the encrypter instance.
|
||||
* Get the encrypter implementation.
|
||||
*
|
||||
* @param \Illuminate\Contracts\Encryption\Encrypter $crypt
|
||||
* @return \Illuminate\Contracts\Encryption\Encrypter
|
||||
*
|
||||
* @throws \Exception
|
||||
*/
|
||||
protected function getEncrypter()
|
||||
{
|
||||
if (is_null($this->encrypter)) {
|
||||
throw new Exception('No encrypter has been set on the Queue.');
|
||||
}
|
||||
|
||||
return $this->encrypter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the encrypter implementation.
|
||||
*
|
||||
* @param \Illuminate\Contracts\Encryption\Encrypter $encrypter
|
||||
* @return void
|
||||
*/
|
||||
public function setEncrypter(EncrypterContract $crypt)
|
||||
public function setEncrypter(Encrypter $encrypter)
|
||||
{
|
||||
$this->crypt = $crypt;
|
||||
$this->encrypter = $encrypter;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
"authors": [
|
||||
{
|
||||
"name": "Taylor Otwell",
|
||||
"email": "taylorotwell@gmail.com"
|
||||
"email": "taylor@laravel.com"
|
||||
}
|
||||
],
|
||||
"require": {
|
||||
|
||||
Reference in New Issue
Block a user