91 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
			
		
		
	
	
			91 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
| <?php namespace Clockwork\DataSource;
 | |
| 
 | |
| use Clockwork\Helpers\Serializer;
 | |
| use Clockwork\Helpers\StackTrace;
 | |
| use Clockwork\Request\Request;
 | |
| 
 | |
| use Illuminate\Queue\Queue;
 | |
| 
 | |
| // Data source for Laravel queue component, provides dispatched queue jobs
 | |
| class LaravelQueueDataSource extends DataSource
 | |
| {
 | |
| 	// Queue instance
 | |
| 	protected $queue;
 | |
| 
 | |
| 	// Dispatched queue jobs
 | |
| 	protected $jobs = [];
 | |
| 
 | |
| 	// Clockwork ID of the current request
 | |
| 	protected $currentRequestId;
 | |
| 
 | |
| 	// Create a new data source instance, takes a queue as an argument
 | |
| 	public function __construct(Queue $queue)
 | |
| 	{
 | |
| 		$this->queue = $queue;
 | |
| 	}
 | |
| 
 | |
| 	// Adds dispatched queue jobs to the request
 | |
| 	public function resolve(Request $request)
 | |
| 	{
 | |
| 		$request->queueJobs = array_merge($request->queueJobs, $this->getJobs());
 | |
| 
 | |
| 		return $request;
 | |
| 	}
 | |
| 
 | |
| 	// Reset the data source to an empty state, clearing any collected data
 | |
| 	public function reset()
 | |
| 	{
 | |
| 		$this->jobs = [];
 | |
| 	}
 | |
| 
 | |
| 	// Listen to the queue events
 | |
| 	public function listenToEvents()
 | |
| 	{
 | |
| 		$this->queue->createPayloadUsing(function ($connection, $queue, $payload) {
 | |
| 			$this->registerJob([
 | |
| 				'id'         => $id = (new Request)->id,
 | |
| 				'connection' => $connection,
 | |
| 				'queue'      => $queue,
 | |
| 				'name'       => $payload['displayName'],
 | |
| 				'data'       => isset($payload['data']['command']) ? $payload['data']['command'] : null,
 | |
| 				'maxTries'   => $payload['maxTries'],
 | |
| 				'timeout'    => $payload['timeout'],
 | |
| 				'time'       => microtime(true)
 | |
| 			]);
 | |
| 
 | |
| 			return [ 'clockwork_id' => $id, 'clockwork_parent_id' => $this->currentRequestId ];
 | |
| 		});
 | |
| 	}
 | |
| 
 | |
| 	// Set Clockwork ID of the current request
 | |
| 	public function setCurrentRequestId($requestId)
 | |
| 	{
 | |
| 		$this->currentRequestId = $requestId;
 | |
| 		return $this;
 | |
| 	}
 | |
| 
 | |
| 	// Collect a dispatched queue job
 | |
| 	protected function registerJob(array $job)
 | |
| 	{
 | |
| 		$trace = StackTrace::get()->resolveViewName();
 | |
| 
 | |
| 		$job = array_merge($job, [
 | |
| 			'trace' => (new Serializer)->trace($trace)
 | |
| 		]);
 | |
| 
 | |
| 		if ($this->passesFilters([ $job ])) {
 | |
| 			$this->jobs[] = $job;
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Get an array of dispatched queue jobs commands
 | |
| 	protected function getJobs()
 | |
| 	{
 | |
| 		return array_map(function ($query) {
 | |
| 			return array_merge($query, [
 | |
| 				'data' => isset($query['data']) ? (new Serializer)->normalize($query['data']) : null
 | |
| 			]);
 | |
| 		}, $this->jobs);
 | |
| 	}
 | |
| }
 | 
