Очереди
- Введение
- Создание заданий
- Мидлвары задания
- Диспетчеризация заданий
- Пакетная обработка заданий
- Замыкание очереди
- Запуск работника очереди
- Конфигурация супервизора
- Работа с неудачными заданиями
- Удаление заданий из очередей
- Мониторинг ваших очередей
- События задания
Введение
При создании веб-приложения у вас могут возникнуть некоторые задачи, такие как синтаксический анализ и сохранение загруженного CSV-файла, выполнение которых во время обычного веб-запроса занимает слишком много времени. К счастью, Laravel позволяет легко создавать задания в очереди, которые могут выполняться в фоновом режиме. Перемещая трудоемкие задачи в очередь, ваше приложение может реагировать на веб-запросы с молниеносной скоростью и обеспечивать лучший пользовательский интерфейс для ваших клиентов.
Очереди Laravel предоставляют унифицированный API очередей для различных серверных частей очередей, таких как Amazon SQS, Redis или даже реляционная база данных.
Параметры конфигурации очереди Laravel хранятся в файле конфигурации config/queue.php
вашего приложения. В этом файле вы найдете конфигурации подключения для каждого из драйверов очередей, включенных в структуру, включая базу данных, Amazon SQS, Redis и Beanstalkd, а также синхронный драйвер, который будет выполнять задания немедленно (для использования во время локальной разработки). Также включен драйвер null
очереди, который отбрасывает поставленные в очередь задания.
Laravel теперь предлагает Horizon, красивую панель инструментов и систему конфигурации для ваших очередей на основе Redis. Ознакомьтесь с полной документацией Horizon для получения дополнительной информации.
Соединения против Очередей
Прежде чем приступить к работе с очередями Laravel, важно понять разницу между «соединениями» и «очередями». В вашем файле конфигурации config/queue.php
есть массив конфигурации connections
. Этот параметр определяет подключения к серверным службам очередей, таким как Amazon SQS, Beanstalk или Redis. Однако любое заданное соединение с очередью может иметь несколько «очередей», которые можно рассматривать как разные стеки или груды заданий в очереди.
Обратите внимание, что каждый пример конфигурации соединения в файле конфигурации queue
содержит атрибут queue
. Это очередь по умолчанию, в которую будут отправляться задания, когда они отправляются в данное соединение. Другими словами, если вы отправляете задание, не определяя явным образом очередь, в которую оно должно быть отправлено, задание будет помещено в очередь, указанную в атрибуте queue
конфигурации соединения:
use App\Jobs\ProcessPodcast; // This job is sent to the default connection's default queue...ProcessPodcast::dispatch(); // This job is sent to the default connection's "emails" queue...ProcessPodcast::dispatch()->onQueue('emails');
Некоторым приложениям может не понадобиться помещать задания в несколько очередей, вместо этого они предпочитают иметь одну простую очередь. Однако размещение заданий в нескольких очередях может быть особенно полезно для приложений, которые хотят расставить приоритеты или сегментировать способы обработки заданий, поскольку обработчик очереди Laravel позволяет указать, какие очереди он должен обрабатывать по приоритету. Например, если вы помещаете задания в high
очередь, вы можете запустить рабочий процесс, который дает им более высокий приоритет обработки:
php artisan queue:work --queue=high,default
Примечания и предварительные требования к драйверу
База Данных
Чтобы использовать драйвер очереди database
, вам понадобится таблица базы данных для хранения заданий. Чтобы сгенерировать миграцию, создающую эту таблицу, запустите Artisan-команду queue:table
. После создания миграции вы можете перенести свою базу данных с помощью команды migrate
:
php artisan queue:table php artisan migrate
Наконец, не забудьте указать вашему приложению использовать драйвер database
, обновив переменную QUEUE_CONNECTION
в файле .env
вашего приложения:
QUEUE_CONNECTION=database
Redis
Чтобы использовать драйвер очереди redis
, вы должны настроить соединение с базой данных Redis в файле конфигурации config/database.php
.
Кластер Redis
Если ваше подключение к очереди Redis использует кластер Redis, имена ваших очередей должны содержать ключевой хэш-тег. Это необходимо для того, чтобы все ключи Redis для данной очереди были помещены в один и тот же хэш-слот:
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => '{default}', 'retry_after' => 90,],
Блокировка
При использовании очереди Redis вы можете использовать параметр конфигурации block_for
, чтобы указать, как долго драйвер должен ждать, пока задание станет доступным, прежде чем выполнить итерацию рабочего цикла и повторно опросить базу данных Redis.
Настройка этого значения в зависимости от загрузки вашей очереди может быть более эффективной, чем постоянный опрос базы данных Redis на наличие новых заданий. Например, вы можете установить значение 5
, чтобы указать, что драйвер должен заблокироваться на пять секунд, ожидая, пока задание станет доступным:
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'retry_after' => 90, 'block_for' => 5,],
Установка
block_for
на0
приведет к тому, что работники очереди будут блокироваться на неопределенный срок, пока задание не будет доступно. Это также предотвратит обработку таких сигналов, какSIGTERM
, до тех пор, пока не будет обработано следующее задание.
Другие требования к драйверу
Для перечисленных драйверов очередей необходимы следующие зависимости. Эти зависимости могут быть установлены через менеджер пакетов Composer:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~4.0
- Redis:
predis/predis ~1.0
or phpredis PHP extension
Создание заданий
Генерация классов заданий
По умолчанию все поставленные в очередь задания для вашего приложения хранятся в каталоге app/Jobs
. Если каталог app/Jobs
не существует, он будет создан, когда вы запустите Artisan-команду make:job
:
php artisan make:job ProcessPodcast
Сгенерированный класс будет реализовывать интерфейс Illuminate\Contracts\Queue\ShouldQueue
, указывая Laravel, что задание должно быть помещено в очередь для асинхронного выполнения.
Заготовки заданий можно настроить с помощью публикации заглушек.
Структура класса
Классы заданий очень просты и обычно содержат только метод дескриптора handle
, который вызывается, когда задание обрабатывается очередью. Для начала давайте рассмотрим пример класса работы. В этом примере мы представим, что управляем службой публикации подкастов и нам нужно обработать загруженные файлы подкастов перед их публикацией:
<?php namespace App\Jobs; use App\Models\Podcast;use App\Services\AudioProcessor;use Illuminate\Bus\Queueable;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Foundation\Bus\Dispatchable;use Illuminate\Queue\InteractsWithQueue;use Illuminate\Queue\SerializesModels; class ProcessPodcast implements ShouldQueue{ use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; /** * The podcast instance. * * @var \App\Models\Podcast */ protected $podcast; /** * Create a new job instance. * * @param App\Models\Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * Execute the job. * * @param App\Services\AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // Process uploaded podcast... }}
Обратите внимание, что в этом примере мы смогли передать модель Eloquent непосредственно в конструктор задания, поставленного в очередь. Из-за трейта SerializesModels
, который использует задание, модели Eloquent и их загруженные отношения будут корректно сериализованы и десериализованы при обработке задания.
Если ваше задание в очереди принимает модель Eloquent в своем конструкторе, в очередь будет сериализован только идентификатор модели. Когда задание будет фактически обработано, система очередей автоматически повторно извлечет из базы данных полный экземпляр модели и его загруженные связи. Такой подход к сериализации модели позволяет отправлять гораздо меньшие полезные данные задания в драйвер очереди.
Внедрение зависимостей метода handle
Метод handle
вызывается, когда задание обрабатывается очередью. Обратите внимание, что мы можем указывать зависимости от метода handle
задания. Laravel сервисный контейнер автоматически внедряет эти зависимости.
Если вы хотите получить полный контроль над тем, как контейнер внедряет зависимости в метод handle
, вы можете использовать метод контейнера bindMethod
. Метод bindMethod
принимает обратный вызов, который получает задание и контейнер. В обратном вызове вы можете вызывать метод handle
, как пожелаете. Как правило, вы должны вызывать этот метод из метода boot
вашего App\Providers\AppServiceProvider
поставщика услуг:
use App\Jobs\ProcessPodcast;use App\Services\AudioProcessor; $this->app->bindMethod([ProcessPodcast::class, 'handle'], function ($job, $app) { return $job->handle($app->make(AudioProcessor::class));});
Двоичные данные, такие как необработанное содержимое изображения, должны быть переданы через функцию
base64_encode
, прежде чем они будут переданы заданию в очереди. В противном случае задание может неправильно сериализоваться в JSON при помещении в очередь.
Отношения очередей
Поскольку загруженные отношения также сериализуются, сериализованная строка задания иногда может стать довольно большой. Чтобы предотвратить сериализацию отношений, вы можете вызвать метод withoutRelations
в модели при установке значения свойства. Этот метод вернет экземпляр модели без загруженных отношений:
/** * Create a new job instance. * * @param \App\Models\Podcast $podcast * @return void */public function __construct(Podcast $podcast){ $this->podcast = $podcast->withoutRelations();}
Кроме того, при десериализации задания и повторном извлечении отношений модели из базы данных они будут извлечены полностью. Любые предыдущие ограничения отношений, которые применялись до сериализации модели в процессе постановки заданий в очередь, не будут применяться при десериализации задания. Поэтому, если вы хотите работать с подмножеством данного отношения, вам следует повторно ограничить это отношение в рамках задания, поставленного в очередь.
Уникальные задания
Для уникальных заданий требуется драйвер кэша, поддерживающий блокировки. В настоящее время драйверы
memcached
,redis
,dynamodb
,database
,file
иarray
поддерживают атомарные блокировки. Кроме того, ограничения уникальных заданий не применяются к заданиям в пакетах.
Иногда может потребоваться убедиться, что в любой момент времени в очереди находится только один экземпляр определенного задания. Вы можете сделать это, реализовав интерфейс ShouldBeUnique
в своем классе заданий. Этот интерфейс не требует определения каких-либо дополнительных методов в вашем классе:
<?php use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Contracts\Queue\ShouldBeUnique; class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique{ ...}
В приведенном выше примере задание UpdateSearchIndex
уникально. Таким образом, задание не будет отправлено, если другой экземпляр задания уже находится в очереди и не завершил обработку.
В некоторых случаях вы можете захотеть определить особый «ключ», который делает задание уникальным, или вы можете указать время ожидания, по истечении которого задание больше не остается уникальным. Для этого вы можете определить свойства или методы uniqueId
и uniqueFor
в вашем классе заданий:
<?php use App\Product;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Contracts\Queue\ShouldBeUnique; class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique{ /** * The product instance. * * @var \App\Product */ public $product; /** * The number of seconds after which the job's unique lock will be released. * * @var int */ public $uniqueFor = 3600; /** * The unique ID of the job. * * @return string */ public function uniqueId() { return $this->product->id; }}
В приведенном выше примере задание UpdateSearchIndex
уникально по идентификатору продукта. Таким образом, любые новые отправки задания с тем же идентификатором продукта будут игнорироваться до тех пор, пока не завершится обработка существующего задания. Кроме того, если существующее задание не будет обработано в течение часа, уникальная блокировка будет снята, и в очередь можно будет отправить другое задание с тем же уникальным ключом.
Сохранение уникальности заданий до начала обработки
По умолчанию уникальные задания «разблокируются» после того, как задание завершит обработку или завершит все свои повторные попытки. Однако могут возникнуть ситуации, когда вы хотели бы, чтобы ваша работа была разблокирована непосредственно перед ее обработкой. Для этого ваша работа должна реализовать контракт ShouldBeUniqueUntilProcessing
вместо контракта ShouldBeUnique
:
<?php use App\Product;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing; class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing{ // ...}
Уникальные блокировки заданий
За кулисами, когда отправляется задание ShouldBeUnique
, Laravel пытается получить lock с ключом uniqueId
. Если блокировка не получена, задание не отправляется. Эта блокировка снимается, когда задание завершает обработку или все его повторные попытки завершаются неудачно. По умолчанию Laravel будет использовать драйвер кеша по умолчанию для получения этой блокировки. Однако, если вы хотите использовать другой драйвер для получения блокировки, вы можете определить метод uniqueVia
, который возвращает драйвер кеша, который следует использовать:
use Illuminate\Support\Facades\Cache; class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique{ ... /** * Get the cache driver for the unique job lock. * * @return \Illuminate\Contracts\Cache\Repository */ public function uniqueVia() { return Cache::driver('redis'); }}
Если вам нужно только ограничить одновременную обработку задания, используйте мидлвары задания
WithoutOverlapping
.
Мидлвары задания
Мидлвар слоя для заданий позволяет обернуть пользовательскую логику вокруг выполнения заданий в очереди, уменьшив количество шаблонов в самих заданиях. Например, рассмотрим следующий метод handle
, который использует функции ограничения скорости Laravel Redis, чтобы разрешить обработку только одного задания каждые пять секунд:
use Illuminate\Support\Facades\Redis; /** * Execute the job. * * @return void */public function handle(){ Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () { info('Lock obtained...'); // Handle job... }, function () { // Could not obtain lock... return $this->release(5); });}
Хотя этот код действителен, реализация метода handle
становится зашумленной, поскольку она загромождена логикой ограничения скорости Redis. Кроме того, эта логика ограничения скорости должна быть продублирована для любых других заданий, которые мы хотим ограничить.
Вместо ограничения скорости в методе handle мы могли бы определить мидлвар задания, которое обрабатывает ограничение скорости. В Laravel нет места по умолчанию для мидлвара заданий, поэтому вы можете размещать мидлвар заданий в любом месте своего приложения. В этом примере мы поместим мидлвар app/Jobs/Middleware
:
<?php namespace App\Jobs\Middleware; use Illuminate\Support\Facades\Redis; class RateLimited{ /** * Process the queued job. * * @param mixed $job * @param callable $next * @return mixed */ public function handle($job, $next) { Redis::throttle('key') ->block(0)->allow(1)->every(5) ->then(function () use ($job, $next) { // Lock obtained... $next($job); }, function () use ($job) { // Could not obtain lock... $job->release(5); }); }}
Как видите, как и мидлвар маршрута, мидлвара заданий получает обрабатываемое задание и обратный вызов, который следует вызывать для продолжения обработки задания.
После создания мидлвара задания их можно присоединить к заданию, вернув их из метода задания middleware
. Этот метод не существует для заданий, созданных Artisan-командой make:job
, поэтому вам нужно будет вручную добавить его в свой класс заданий:
use App\Jobs\Middleware\RateLimited; /** * Get the middleware the job should pass through. * * @return array */public function middleware(){ return [new RateLimited];}
Мидлвар слоя заданий также можно назначать прослушивателям событий, почтовым сообщениям и уведомлениям.
Ограничение скорости
Хотя мы только что продемонстрировали, как написать собственный мидлвар для ограничения скорости, Laravel на самом деле включает в себя мидлвар для ограничения скорости, которое вы можете использовать для ограничения скорости работы. Как и ограничители скорости маршрута, ограничители скорости задания определяются с помощью метода for
фасада RateLimiter
.
Например, вы можете разрешить пользователям создавать резервные копии своих данных один раз в час, не накладывая такого ограничения на премиум-клиентов. Для этого вы можете определить RateLimiter
в методе boot
вашего AppServiceProvider
:
use Illuminate\Cache\RateLimiting\Limit;use Illuminate\Support\Facades\RateLimiter; /** * Bootstrap any application services. * * @return void */public function boot(){ RateLimiter::for('backups', function ($job) { return $job->user->vipCustomer() ? Limit::none() : Limit::perHour(1)->by($job->user->id); });}
В приведенном выше примере мы определили ограничение почасовой ставки; однако вы можете легко определить ограничение скорости на основе минут, используя метод perMinute
. Кроме того, вы можете передать любое значение, которое хотите, в метод by
ограничения скорости; однако это значение чаще всего используется для сегментации лимитов скорости по клиентам:
return Limit::perMinute(50)->by($job->user->id);
После того, как вы определили ограничение скорости, вы можете прикрепить ограничитель скорости к своему заданию резервного копирования с помощью мидлвара Illuminate\Queue\Middleware\RateLimited
. Каждый раз, когда задание превышает ограничение скорости, это мидлвар будет возвращать задание обратно в очередь с соответствующей задержкой, основанной на продолжительности ограничения скорости.
use Illuminate\Queue\Middleware\RateLimited; /** * Get the middleware the job should pass through. * * @return array */public function middleware(){ return [new RateLimited('backups')];}
Возврат задания с ограниченной скоростью в очередь все равно увеличивает общее количество заданий attempts
. Вы можете соответствующим образом настроить свойства tries
и maxExceptions
в своем классе заданий. Или вы можете использовать метод retryUntil
method, чтобы определить количество времени, по истечении которого выполнение задания больше не должно выполняться.
Если вы не хотите, чтобы задание выполнялось повторно, когда оно ограничено по скорости, вы можете использовать метод dontRelease
:
/** * Get the middleware the job should pass through. * * @return array */public function middleware(){ return [(new RateLimited('backups'))->dontRelease()];}
Если вы используете Redis, вы можете использовать мидлвар
Illuminate\Queue\Middleware\RateLimitedWithRedis
, который точно настроен для Redis и более эффективен, чем базовый мидлвар ограничения скорости.
Предотвращение дублирования заданий
Laravel включает в себя мидлвар Illuminate\Queue\Middleware\WithoutOverlapping
, которое позволяет предотвратить перекрытие заданий на основе произвольного ключа. Это может быть полезно, когда задание в очереди изменяет ресурс, который должен изменяться только одним заданием за раз.
Например, предположим, что у вас есть поставленное в очередь задание, которое обновляет кредитный рейтинг пользователя, и вы хотите предотвратить дублирование заданий по обновлению кредитного рейтинга для одного и того же идентификатора пользователя. Для этого вы можете вернуть мидлвар WithoutOverlapping
из метода middleware
вашего задания:
use Illuminate\Queue\Middleware\WithoutOverlapping; /** * Get the middleware the job should pass through. * * @return array */public function middleware(){ return [new WithoutOverlapping($this->user->id)];}
Любые перекрывающиеся задания будут возвращены в очередь. Вы также можете указать количество секунд, которое должно пройти, прежде чем будет предпринята повторная попытка освободить задание:
/** * Get the middleware the job should pass through. * * @return array */public function middleware(){ return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];}
Если вы хотите немедленно удалить любые перекрывающиеся задания, чтобы они не выполнялись повторно, вы можете использовать метод dontRelease
:
/** * Get the middleware the job should pass through. * * @return array */public function middleware(){ return [(new WithoutOverlapping($this->order->id))->dontRelease()];}
Мидлвар WithoutOverlapping
использует функцию атомарной блокировки Laravel. Иногда ваша работа может неожиданно завершиться ошибкой или истечь время ожидания таким образом, что блокировка не будет снята. Таким образом, вы можете явно указать время истечения блокировки, используя метод expireAfter
. Например, в приведенном ниже примере Laravel укажет снять блокировку WithoutOverlapping
через три минуты после начала обработки задания:
/** * Get the middleware the job should pass through. * * @return array */public function middleware(){ return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];}
Для мидлвара
WithoutOverlapping
требуется драйвер кеша, поддерживающий блокировки. В настоящее время драйверы кешаmemcached
,redis
,dynamodb
,database
,file
иarray
поддерживают атомарные блокировки.
Регулирование исключений
Laravel включает в себя мидлвар Illuminate\Queue\Middleware\ThrottlesExceptions
, которое позволяет блокировать исключения. Как только задание генерирует заданное количество исключений, все дальнейшие попытки выполнить задание откладываются до тех пор, пока не истечет указанный интервал времени. Это мидлвара слоя особенно полезно для заданий, которые взаимодействуют со сторонними службами, работа которых нестабильна.
Например, давайте представим поставленное в очередь задание, взаимодействующее со сторонним API, которое начинает генерировать исключения. Чтобы ограничить исключения, вы можете вернуть мидлвар ThrottlesExceptions
из метода middleware
вашего задания. Как правило, этот мидлвар должен быть связан с заданием, которое реализует попытки на основе времени:
use Illuminate\Queue\Middleware\ThrottlesExceptions; /** * Get the middleware the job should pass through. * * @return array */public function middleware(){ return [new ThrottlesExceptions(10, 5)];} /** * Determine the time at which the job should timeout. * * @return \DateTime */public function retryUntil(){ return now()->addMinutes(5);}
Первый аргумент конструктора, принимаемый мидлваром, — это количество исключений, которые задание может создать до того, как оно будет отрегулировано, а второй аргумент конструктора — это количество минут, которое должно пройти, прежде чем задание будет предпринято снова после того, как оно было отрегулировано. В приведенном выше примере кода, если задание выдает 10 исключений в течение 5 минут, мы будем ждать 5 минут, прежде чем повторить задание.
Когда задание выдает исключение, но пороговое значение исключения еще не достигнуто, задание обычно немедленно повторяется. Однако вы можете указать количество минут, на которое такое задание должно быть отложено, вызвав метод backoff
при подключении мидлвара к заданию:
use Illuminate\Queue\Middleware\ThrottlesExceptions; /** * Get the middleware the job should pass through. * * @return array */public function middleware(){ return [(new ThrottlesExceptions(10, 5))->backoff(5)];}
Внутри этот мидлвар использует систему кэширования Laravel для реализации ограничения скорости, а имя класса задания используется в качестве «ключа» кэша. Вы можете переопределить этот ключ, вызвав метод by
при подключении мидлвара к вашему заданию. Это может быть полезно, если у вас есть несколько заданий, взаимодействующих с одной и той же сторонней службой, и вы хотите, чтобы они совместно использовали общее регулирование:
use Illuminate\Queue\Middleware\ThrottlesExceptions; /** * Get the middleware the job should pass through. * * @return array */public function middleware(){ return [(new ThrottlesExceptions(10, 10))->by('key')];}
Если вы используете Redis, вы можете использовать мидлвар
Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis
, который точно настроен для Redis и более эффективно, чем базовый мидлвар для регулирования исключений.
Диспетчеризация заданий
После того, как вы написали свой класс задания, вы можете отправить его, используя метод dispatch
самого задания. Аргументы, переданные методу dispatch
, будут переданы конструктору задания:
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. * * @param \Illuminate\Http\Request $request * @return \Illuminate\Http\Response */ public function store(Request $request) { $podcast = Podcast::create(...); // ... ProcessPodcast::dispatch($podcast); }}
Если вы хотите условно отправить задание, вы можете использовать методы dispatchIf
и dispatchUnless
:
ProcessPodcast::dispatchIf($accountActive, $podcast); ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
Отложенная отправка
Если вы хотите указать, что задание не должно быть немедленно доступно для обработки обработчиком очереди, вы можете использовать метод delay
при отправке задания. Например, давайте укажем, что задание не должно быть доступно для обработки в течение 10 минут после его отправки:
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. * * @param \Illuminate\Http\Request $request * @return \Illuminate\Http\Response */ public function store(Request $request) { $podcast = Podcast::create(...); // ... ProcessPodcast::dispatch($podcast) ->delay(now()->addMinutes(10)); }}
Служба очередей Amazon SQS имеет максимальное время задержки 15 минут.
Отправка после отправки ответа в браузер
В качестве альтернативы метод dispatchAfterResponse
откладывает отправку задания до тех пор, пока HTTP-ответ не будет отправлен в браузер пользователя. Это по-прежнему позволит пользователю начать использовать приложение, даже если задание в очереди все еще выполняется. Обычно это следует использовать только для заданий, которые занимают около секунды, например, для отправки электронной почты. Поскольку они обрабатываются в рамках текущего HTTP-запроса, задания, отправленные таким образом, не требуют, чтобы для их обработки был запущен обработчик очереди:
use App\Jobs\SendNotification; SendNotification::dispatchAfterResponse();
Вы также можете dispatch
замыкание и связать метод afterResponse
с помощником dispatch
, чтобы выполнить замыкание после того, как HTTP-ответ был отправлен в браузер:
use App\Mail\WelcomeMessage;use Illuminate\Support\Facades\Mail; dispatch(function () { Mail::to('taylor@example.com')->send(new WelcomeMessage);})->afterResponse();
Синхронная диспетчеризация
Если вы хотите отправить задание немедленно (синхронно), вы можете использовать метод dispatchSync
. При использовании этого метода задание не будет поставлено в очередь и будет выполнено немедленно в рамках текущего процесса:
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. * * @param \Illuminate\Http\Request $request * @return \Illuminate\Http\Response */ public function store(Request $request) { $podcast = Podcast::create(...); // Create podcast... ProcessPodcast::dispatchSync($podcast); }}
Задания и транзакции с базами данных
Хотя отправлять задания в рамках транзакций базы данных вполне нормально, вам следует позаботиться о том, чтобы ваше задание действительно могло выполняться успешно. При отправке задания внутри транзакции возможно, что задание будет обработано рабочим до того, как транзакция будет зафиксирована. Когда это происходит, любые обновления, которые вы внесли в модели или записи базы данных во время транзакции базы данных, могут еще не отражаться в базе данных. Кроме того, любые модели или записи базы данных, созданные в рамках транзакции, могут не существовать в базе данных.
К счастью, Laravel предоставляет несколько методов решения этой проблемы. Во-первых, вы можете установить параметр подключения after_commit
в массиве конфигурации вашего соединения с очередью:
'redis' => [ 'driver' => 'redis', // ... 'after_commit' => true,],
Когда опция after_commit
имеет значение true
, вы можете отправлять задания в рамках транзакций базы данных; однако Laravel будет ждать, пока все открытые транзакции базы данных не будут зафиксированы, прежде чем фактически отправить задание. Конечно, если в настоящее время нет открытых транзакций базы данных, задание будет отправлено немедленно.
Если транзакция откатывается из-за исключения, возникшего во время транзакции, отправленные задания, которые были отправлены во время этой транзакции, будут отброшены.
Установка для параметра конфигурации
after_commit
значенияtrue
также приведет к тому, что любые прослушиватели событий, почтовые сообщения, уведомления и широковещательные события, находящиеся в очереди, будут отправлены после фиксации всех открытых транзакций базы данных.
Указание встроенного поведения Commit Dispatch
Если вы не установите для параметра конфигурации подключения к очереди after_commit
значение true
, вы все равно можете указать, что определенное задание должно быть отправлено после фиксации всех открытых транзакций базы данных. Для этого вы можете связать метод afterCommit
с вашей операцией отправки:
use App\Jobs\ProcessPodcast; ProcessPodcast::dispatch($podcast)->afterCommit();
Аналогичным образом, если для параметра конфигурации after_commit
установлено значение true
, вы можете указать, что конкретное задание должно быть отправлено немедленно, не дожидаясь фиксации каких-либо открытых транзакций базы данных:
ProcessPodcast::dispatch($podcast)->beforeCommit();
Цепочка заданий
Цепочка заданий позволяет указать список заданий в очереди, которые должны выполняться последовательно после успешного выполнения основного задания. Если одно задание в последовательности завершается с ошибкой, остальные задания выполняться не будут. Чтобы выполнить цепочку заданий в очереди, вы можете использовать метод chain
, предоставляемый фасадом Bus
. Командная шина Laravel — это компонент более низкого уровня, над которым построена диспетчеризация заданий в очереди:
use App\Jobs\OptimizePodcast;use App\Jobs\ProcessPodcast;use App\Jobs\ReleasePodcast;use Illuminate\Support\Facades\Bus; Bus::chain([ new ProcessPodcast, new OptimizePodcast, new ReleasePodcast,])->dispatch();
В дополнение к цепочке экземпляров класса задания вы также можете связать замыкания:
Bus::chain([ new ProcessPodcast, new OptimizePodcast, function () { Podcast::update(...); },])->dispatch();
Удаление заданий с помощью метода
$this->delete()
внутри задания не помешает обработке связанных заданий. Цепочка прекратит выполнение только в случае сбоя задания в цепочке.
Цепочка соединения и очередь
Если вы хотите указать соединение и очередь, которые должны использоваться для связанных заданий, вы можете использовать методы onConnection
и onQueue
. Эти методы указывают соединение с очередью и имя очереди, которые следует использовать, если заданию в очереди явно не назначено другое соединение/очередь:
Bus::chain([ new ProcessPodcast, new OptimizePodcast, new ReleasePodcast,])->onConnection('redis')->onQueue('podcasts')->dispatch();
Неисправности цепи
При объединении заданий в цепочку вы можете использовать метод catch
, чтобы указать замыкание, которое должно быть вызвано, если задание в цепочке завершается неудачно. Данный обратный вызов получит экземпляр Throwable
, вызвавший сбой задания:
use Illuminate\Support\Facades\Bus;use Throwable; Bus::chain([ new ProcessPodcast, new OptimizePodcast, new ReleasePodcast,])->catch(function (Throwable $e) { // A job within the chain has failed...})->dispatch();
Настройка очереди и соединения
Отправка в определенную очередь
Помещая задания в разные очереди, вы можете «классифицировать» поставленные в очередь задания и даже расставить приоритеты в отношении того, сколько работников вы назначаете в разные очереди. Имейте в виду, что это не отправляет задания в разные «соединения» очередей, как определено в файле конфигурации вашей очереди, а только в определенные очереди в рамках одного соединения. Чтобы указать очередь, используйте метод onQueue
при отправке задания:
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. * * @param \Illuminate\Http\Request $request * @return \Illuminate\Http\Response */ public function store(Request $request) { $podcast = Podcast::create(...); // Create podcast... ProcessPodcast::dispatch($podcast)->onQueue('processing'); }}
Кроме того, вы можете указать очередь задания, вызвав метод onQueue
в конструкторе задания:
<?php namespace App\Jobs; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; class ProcessPodcast implements ShouldQueue{ use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; /** * Create a new job instance. * * @return void */ public function __construct() { $this->onQueue('processing'); }}
Отправка на конкретное соединение
Если ваше приложение взаимодействует с несколькими подключениями к очередям, вы можете указать, в какое подключение отправлять задание, используя метод onConnection
:
<?php namespace App\Http\Controllers; use App\Http\Controllers\Controller;use App\Jobs\ProcessPodcast;use App\Models\Podcast;use Illuminate\Http\Request; class PodcastController extends Controller{ /** * Store a new podcast. * * @param \Illuminate\Http\Request $request * @return \Illuminate\Http\Response */ public function store(Request $request) { $podcast = Podcast::create(...); // Create podcast... ProcessPodcast::dispatch($podcast)->onConnection('sqs'); }}
Вы можете связать методы onConnection
и onQueue
вместе, чтобы указать соединение и очередь для задания:
ProcessPodcast::dispatch($podcast) ->onConnection('sqs') ->onQueue('processing');
Кроме того, вы можете указать соединение задания, вызвав метод onConnection
в конструкторе задания:
<?php namespace App\Jobs; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; class ProcessPodcast implements ShouldQueue{ use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; /** * Create a new job instance. * * @return void */ public function __construct() { $this->onConnection('sqs'); }}
Указание максимального числа попыток задания/значения тайм-аута
Максимальное количество попыток
Если одно из ваших заданий в очереди сталкивается с ошибкой, вы, вероятно, не хотите, чтобы оно повторялось бесконечно. Поэтому Laravel предоставляет различные способы указать, сколько раз или как долго можно пытаться выполнить задание.
Один из подходов к указанию максимального количества попыток выполнения задания — использование переключателя --tries
в командной строке Artisan. Это будет применяться ко всем заданиям, обрабатываемым работником, если только обрабатываемое задание не указывает более конкретное количество попыток его выполнения:
php artisan queue:work --tries=3
Если задание превышает максимальное количество попыток, оно будет считаться «неудачным». Дополнительную информацию об обработке невыполненных заданий см. в документации по невыполненным заданиям.
Вы можете применить более детальный подход, определив максимальное количество попыток выполнения задания в самом классе заданий. Если для задания указано максимальное количество попыток, оно будет иметь приоритет над значением --tries
, указанным в командной строке:
<?php namespace App\Jobs; class ProcessPodcast implements ShouldQueue{ /** * The number of times the job may be attempted. * * @var int */ public $tries = 5;}
Попытки, основанные на времени
В качестве альтернативы определению того, сколько раз задание может быть выполнено до того, как оно завершится ошибкой, вы можете определить время, после которого выполнение задания больше не должно выполняться. Это позволяет выполнять задание любое количество раз в течение заданного периода времени. Чтобы определить время, когда задание больше не должно выполняться, добавьте в свой класс задания метод retryUntil
. Этот метод должен возвращать экземпляр DateTime
:
/** * Determine the time at which the job should timeout. * * @return \DateTime */public function retryUntil(){ return now()->addMinutes(10);}
Вы также можете определить свойство
tries
или методretryUntil
для ваших прослушивателей событий в очереди.
Максимальное количество исключений
Иногда вы можете захотеть указать, что задание может выполняться много раз, но должно завершиться ошибкой, если повторные попытки инициируются заданным количеством необработанных исключений (в отличие от прямого освобождения методом release
). Для этого вы можете определить свойство maxExceptions
в своем классе заданий:
<?php namespace App\Jobs; use Illuminate\Support\Facades\Redis; class ProcessPodcast implements ShouldQueue{ /** * The number of times the job may be attempted. * * @var int */ public $tries = 25; /** * The maximum number of unhandled exceptions to allow before failing. * * @var int */ public $maxExceptions = 3; /** * Execute the job. * * @return void */ public function handle() { Redis::throttle('key')->allow(10)->every(60)->then(function () { // Lock obtained, process the podcast... }, function () { // Unable to obtain lock... return $this->release(10); }); }}
В этом примере задание высвобождается на десять секунд, если приложению не удается получить блокировку Redis, и оно будет продолжать выполняться до 25 раз. Однако задание завершится сбоем, если оно вызовет три необработанных исключения.
Тайм-аут
Расширение PHP
pcntl
должно быть установлено, чтобы указать время ожидания задания.
Часто вы примерно знаете, сколько времени, по вашему мнению, займет выполнение поставленных в очередь заданий. По этой причине Laravel позволяет вам указать значение «тайм-аут». Если задание обрабатывается дольше, чем количество секунд, указанное значением времени ожидания, рабочий процесс, обрабатывающий задание, завершится с ошибкой. Как правило, рабочий процесс автоматически перезапускается диспетчером процессов, настроенным на вашем сервере.
Максимальное количество секунд, в течение которых могут выполняться задания, можно указать с помощью параметра --timeout
в командной строке Artisan:
php artisan queue:work --timeout=30
Если задание превышает максимальное количество попыток из-за постоянного истечения времени ожидания, оно будет помечено как неудавшееся.
Вы также можете определить максимальное количество секунд, в течение которых задание должно выполняться в самом классе задания. Если тайм-аут указан в задании, он будет иметь приоритет над любым тайм-аутом, указанным в командной строке:
<?php namespace App\Jobs; class ProcessPodcast implements ShouldQueue{ /** * The number of seconds the job can run before timing out. * * @var int */ public $timeout = 120;}
Иногда процессы блокировки ввода-вывода, такие как сокеты или исходящие HTTP-соединения, могут не соблюдать указанный вами тайм-аут. Поэтому при использовании этих функций вы всегда должны пытаться указать тайм-аут, используя их API. Например, при использовании Guzzle всегда следует указывать время ожидания подключения и запроса.
Сбой по тайм-ауту
Если вы хотите указать, что задание должно быть помечено как failed по истечении времени ожидания, вы можете определить свойство $failOnTimeout
в классе задания:
/** * Indicate if the job should be marked as failed on timeout. * * @var bool */public $failOnTimeout = true;
Обработка ошибок
Если во время обработки задания возникнет исключение, задание будет автоматически возвращено в очередь, чтобы его можно было повторить. Задание будет по-прежнему освобождаться до тех пор, пока не будет предпринято максимальное количество попыток, разрешенное вашим приложением. Максимальное количество попыток определяется переключателем --tries
, используемым в Artisan-команде queue:work
. В качестве альтернативы максимальное количество попыток может быть определено в самом классе задания. Дополнительную информацию о запуске обработчика очереди можно найти ниже.
Выпуск задания вручную
Иногда вам может потребоваться вручную вернуть задание в очередь, чтобы его можно было повторить позже. Вы можете сделать это, вызвав метод release
:
/** * Execute the job. * * @return void */public function handle(){ // ... $this->release();}
По умолчанию метод release
возвращает задание в очередь для немедленной обработки. Однако, передав целое число методу release
, вы можете указать очереди не делать задание доступным для обработки, пока не истечет заданное количество секунд:
$this->release(10);
Сбой задания вручную
Иногда вам может потребоваться вручную пометить задание как «неудачное». Для этого вы можете вызвать метод fail
:
/** * Execute the job. * * @return void */public function handle(){ // ... $this->fail();}
Если вы хотите пометить свою работу как неудачную из-за исключения, которое вы перехватили, вы можете передать исключение методу fail
:
$this->fail($exception);
Для получения дополнительной информации о невыполненных заданиях ознакомьтесь с документацией по устранению сбоев заданий.
Пакетная обработка заданий
Функция пакетной обработки заданий Laravel позволяет вам легко выполнять пакет заданий, а затем выполнять некоторые действия после завершения выполнения пакета заданий. Прежде чем приступить к работе, вы должны создать миграцию базы данных, чтобы построить таблицу, содержащую метаинформацию о ваших пакетах заданий, например процент их выполнения. Эту миграцию можно сгенерировать с помощью Artisan-команды queue:batches-table
:
php artisan queue:batches-table php artisan migrate
Определение пакетных заданий
Чтобы определить пакетное задание, вы должны создать задание с очередью как обычно; тем не менее, вы должны добавить трейт Illuminate\Bus\Batchable
к классу задания. Этот трейт предоставляет доступ к методу batch
, который можно использовать для получения текущего пакета, в котором выполняется задание:
<?php namespace App\Jobs; use Illuminate\Bus\Batchable;use Illuminate\Bus\Queueable;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Foundation\Bus\Dispatchable;use Illuminate\Queue\InteractsWithQueue;use Illuminate\Queue\SerializesModels; class ImportCsv implements ShouldQueue{ use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels; /** * Execute the job. * * @return void */ public function handle() { if ($this->batch()->cancelled()) { // Determine if the batch has been cancelled... return; } // Import a portion of the CSV file... }}
Отправка пакетов
Чтобы отправить пакет заданий, вы должны использовать метод batch
фасада Bus
. Конечно, пакетная обработка в первую очередь полезна в сочетании с обратными вызовами завершения. Таким образом, вы можете использовать методы then
, catch
и finally
, чтобы определить обратные вызовы завершения для пакета. Каждый из этих обратных вызовов получит экземпляр Illuminate\Bus\Batch
при вызове. В этом примере мы представим, что ставим в очередь пакет заданий, каждое из которых обрабатывает заданное количество строк из CSV-файла:
use App\Jobs\ImportCsv;use Illuminate\Bus\Batch;use Illuminate\Support\Facades\Bus;use Throwable; $batch = Bus::batch([ new ImportCsv(1, 100), new ImportCsv(101, 200), new ImportCsv(201, 300), new ImportCsv(301, 400), new ImportCsv(401, 500),])->then(function (Batch $batch) { // All jobs completed successfully...})->catch(function (Batch $batch, Throwable $e) { // First batch job failure detected...})->finally(function (Batch $batch) { // The batch has finished executing...})->dispatch(); return $batch->id;
Идентификатор пакета, доступ к которому можно получить через свойство $batch->id
, можно использовать для запроса командной шины Laravel для получения информации о пакете после его отправки.
Поскольку пакетные обратные вызовы сериализуются и выполняются позже очередью Laravel, вы не должны использовать переменную
$this
в обратных вызовах.
Именование пакетов
Некоторые инструменты, такие как Laravel Horizon и Laravel Telescope, могут предоставлять более удобную отладочную информацию для пакетов, если пакеты имеют имена. Чтобы присвоить пакету произвольное имя, вы можете вызвать метод name
при определении пакета:
$batch = Bus::batch([ // ...])->then(function (Batch $batch) { // All jobs completed successfully...})->name('Import CSV')->dispatch();
Пакетное соединение и очередь
Если вы хотите указать соединение и очередь, которые должны использоваться для пакетных заданий, вы можете использовать методы onConnection
и onQueue
. Все пакетные задания должны выполняться в одном и том же соединении и очереди:
$batch = Bus::batch([ // ...])->then(function (Batch $batch) { // All jobs completed successfully...})->onConnection('redis')->onQueue('imports')->dispatch();
Цепочки внутри пакетов
Вы можете определить набор сцепленных заданий в пакете, поместив связанные задания в массив. Например, мы можем выполнить две цепочки заданий параллельно и выполнить обратный вызов, когда обе цепочки заданий закончат обработку:
use App\Jobs\ReleasePodcast;use App\Jobs\SendPodcastReleaseNotification;use Illuminate\Bus\Batch;use Illuminate\Support\Facades\Bus; Bus::batch([ [ new ReleasePodcast(1), new SendPodcastReleaseNotification(1), ], [ new ReleasePodcast(2), new SendPodcastReleaseNotification(2), ],])->then(function (Batch $batch) { // ...})->dispatch();
Добавление заданий в пакеты
Иногда бывает полезно добавить дополнительные задания в пакет из самого пакетного задания. Этот шаблон может быть полезен, когда вам нужно пакетировать тысячи заданий, которые могут занять слишком много времени для отправки во время веб-запроса. Таким образом, вместо этого вы можете захотеть отправить первоначальный пакет заданий «загрузчик», которые гидратируют пакет еще большим количеством заданий:
$batch = Bus::batch([ new LoadImportBatch, new LoadImportBatch, new LoadImportBatch,])->then(function (Batch $batch) { // All jobs completed successfully...})->name('Import Contacts')->dispatch();
В этом примере мы будем использовать задание LoadImportBatch
, чтобы заполнить пакет дополнительными заданиями. Для этого мы можем использовать метод add
для пакетного экземпляра, к которому можно получить доступ через пакетный метод задания batch
:
use App\Jobs\ImportContacts;use Illuminate\Support\Collection; /** * Execute the job. * * @return void */public function handle(){ if ($this->batch()->cancelled()) { return; } $this->batch()->add(Collection::times(1000, function () { return new ImportContacts; }));}
Вы можете добавлять задания в пакет только из задания, которое принадлежит к тому же пакету.
Проверка пакетов
Экземпляр Illuminate\Bus\Batch
, предоставляемый для обратных вызовов пакетного завершения, имеет множество свойств и методов, помогающих вам взаимодействовать с данным пакетом заданий и проверять его:
// UUID партии...$batch->id; // Название партии (если применимо)...$batch->name; // Количество заданий, назначенных на партию...$batch->totalJobs; // Количество заданий, не обработанных очередью...$batch->pendingJobs; // Количество невыполненных работ...$batch->failedJobs; // Количество заданий, которые были обработаны на данный момент...$batch->processedJobs(); // Процент завершения партии (0-100)...$batch->progress(); // Указывает, завершилось ли выполнение пакета...$batch->finished(); // Отменить выполнение пакета...$batch->cancel(); // Указывает, была ли партия отменена...$batch->cancelled();
Возврат пакетов из маршрутов
Все экземпляры Illuminate\Bus\Batch
сериализуемы в формате JSON, что означает, что вы можете возвращать их непосредственно из одного из маршрутов вашего приложения для получения полезных данных JSON, содержащих информацию о пакете, включая ход его выполнения. Это позволяет удобно отображать информацию о ходе выполнения пакета в пользовательском интерфейсе приложения.
Чтобы получить пакет по его идентификатору, вы можете использовать метод Bus
фасада findBatch
:
use Illuminate\Support\Facades\Bus;use Illuminate\Support\Facades\Route; Route::get('/batch/{batchId}', function (string $batchId) { return Bus::findBatch($batchId);});
Отмена пакетов
Иногда вам может понадобиться отменить выполнение данного пакета. Этого можно добиться, вызвав метод cancel
для экземпляра Illuminate\Bus\Batch
:
/** * Execute the job. * * @return void */public function handle(){ if ($this->user->exceedsImportLimit()) { return $this->batch()->cancel(); } if ($this->batch()->cancelled()) { return; }}
Как вы, возможно, заметили в предыдущих примерах, пакетные задания обычно должны проверять, был ли пакет отменен в начале их метода handle
:
/** * Execute the job. * * @return void */public function handle(){ if ($this->batch()->cancelled()) { return; } // Continue processing...}
Пакетные сбои
При сбое пакетного задания будет вызван обратный вызов catch
(если он назначен). Этот обратный вызов вызывается только для первого задания, которое не удалось выполнить в пакете.
Разрешение сбоев
Когда задание в пакете терпит неудачу, Laravel автоматически помечает пакет как «отмененный». При желании вы можете отключить это поведение, чтобы сбой задания не помечал автоматически пакет как отмененный. Этого можно добиться, вызвав метод allowFailures
при отправке пакета:
$batch = Bus::batch([ // ...])->then(function (Batch $batch) { // All jobs completed successfully...})->allowFailures()->dispatch();
Повтор неудачных пакетных заданий
Для удобства Laravel предоставляет Artisan-команду queue:retry-batch
, которая позволяет вам легко повторить все невыполненные задания для данного пакета. Команда queue:retry-batch
принимает UUID пакета, чьи неудачные задания следует повторить:
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
Обрезка пакетов
Без обрезки таблица job_batches
может очень быстро накапливать записи. Чтобы избежать этого, вы должны schedule Artisan-команду queue:prune-batches
запускать ежедневно:
$schedule->command('queue:prune-batches')->daily();
По умолчанию все готовые пакеты старше 24 часов будут удалены. Вы можете использовать опцию hours
при вызове команды, чтобы определить, как долго хранить пакетные данные. Например, следующая команда удалит все пакеты, завершенные более 48 часов назад:
$schedule->command('queue:prune-batches --hours=48')->daily();
Иногда в вашей таблице jobs_batches
могут накапливаться записи о пакетах, которые никогда не завершались успешно, например, о пакетах, в которых задание завершилось неудачно, и это задание никогда не было успешно повторено. Вы можете указать команде queue:prune-batches
удалить эти незавершенные пакетные записи, используя опцию unfinished
:
$schedule->command('queue:prune-batches --hours=48 --unfinished=72')->daily();
Замыкание очереди
Вместо отправки класса задания в очередь вы также можете отправить замыкание. Это отлично подходит для быстрых и простых задач, которые необходимо выполнить за пределами текущего цикла запроса. При отправке замыканий в очередь содержимое кода замыкания криптографически подписывается, чтобы его нельзя было изменить при передаче:
$podcast = App\Podcast::find(1); dispatch(function () use ($podcast) { $podcast->publish();});
Используя метод catch
, вы можете предоставить замыкание, которое должно быть выполнено, если замыкание в очереди не может быть успешно завершено после исчерпания всех настроенных попыток повтора вашей очереди:
use Throwable; dispatch(function () use ($podcast) { $podcast->publish();})->catch(function (Throwable $e) { // This job has failed...});
Запуск работника очереди
Команда queue:work
Laravel включает Artisan-команду, которая запускает обработчик очереди и обрабатывает новые задания по мере их поступления в очередь. Вы можете запустить рабочего с помощью Artisan-команды queue:work
. Обратите внимание, что после запуска команды queue:work
она будет продолжать выполняться до тех пор, пока не будет остановлена вручную или пока вы не закроете терминал:
php artisan queue:work
Чтобы процесс
queue:work
работал постоянно в фоновом режиме, вы должны использовать монитор процесса, такой как Супервизор, чтобы гарантировать, что рабочий процесс очереди не остановится.
Помните, что рабочие очереди — это долгоживущие процессы, которые хранят состояние загруженного приложения в памяти. В результате они не заметят изменений в вашей кодовой базе после их запуска. Поэтому в процессе развертывания обязательно перезапустите обработчики очереди. Кроме того, помните, что любое статическое состояние, созданное или измененное вашим приложением, не будет автоматически сбрасываться между заданиями.
В качестве альтернативы вы можете запустить команду queue:listen
. При использовании команды queue:listen
вам не нужно вручную перезапускать рабочий процесс, когда вы хотите перезагрузить обновленный код или сбросить состояние приложения; однако эта команда значительно менее эффективна, чем команда queue:work
:
php artisan queue:listen
Запуск нескольких обработчиков очереди
Чтобы назначить несколько рабочих в очередь и одновременно обрабатывать задания, вы должны просто запустить несколько процессов queue:work
. Это можно сделать либо локально с помощью нескольких вкладок в вашем терминале, либо в рабочей среде, используя настройки конфигурации вашего диспетчера процессов. При использовании Супервизора вы можете использовать значение конфигурации numprocs
.
Указание соединения и очереди
Вы также можете указать, какое соединение с очередью должен использовать работник. Имя соединения, переданное команде work
, должно соответствовать одному из соединений, определенных в вашем конфигурационном файле config/queue.php
:
php artisan queue:work redis
По умолчанию команда queue:work
обрабатывает задания только для очереди по умолчанию для данного соединения. Однако вы можете дополнительно настроить обработчик очередей, обрабатывая только определенные очереди для данного соединения. Например, если все ваши электронные письма обрабатываются в очереди emails
в вашем соединении очереди redis
, вы можете ввести следующую команду, чтобы запустить работника, который обрабатывает только эту очередь:
php artisan queue:work redis --queue=emails
Обработка указанного количества заданий
Опция --once
может быть использована для указания рабочему процессу обрабатывать только одно задание из очереди:
php artisan queue:work --once
Опция --max-jobs
может быть использована для указания рабочему процессу обработать заданное количество заданий и затем выйти. Эта опция может быть полезна в сочетании с Supervisor, чтобы ваши рабочие процессы автоматически перезапускались после обработки заданного количества заданий, освобождая любую память, которую они могли накопить:
php artisan queue:work --max-jobs=1000
Обработка всех заданий в очереди и последующий выход
Параметр --stop-when-empty
может использоваться, чтобы указать рабочему процессу обрабатывать все задания, а затем корректно завершить работу. Этот параметр может быть полезен при обработке очередей Laravel в контейнере Docker, если вы хотите закрыть контейнер после того, как очередь пуста:
php artisan queue:work --stop-when-empty
Обработка заданий в течение заданного количества секунд
Опция --max-time
может быть использована для указания рабочему процессу обрабатывать задания в течение заданного количества секунд, а затем выйти. Эта опция может быть полезна в сочетании с Supervisor, чтобы ваши рабочие процессы автоматически перезапускались после обработки заданий в течение заданного периода времени, освобождая любую память, которую они могли накопить:
// Process jobs for one hour and then exit...php artisan queue:work --max-time=3600
Продолжительность сна работника
Когда задания доступны в очереди, рабочий процесс будет продолжать обрабатывать задания без задержки между ними. Однако параметр sleep
определяет, сколько секунд рабочий процесс будет «спать», если нет доступных новых заданий. Во время сна воркер не будет обрабатывать новые задания — задания будут обработаны после того, как воркер снова проснется.
php artisan queue:work --sleep=3
Рекомендации по ресурсам
Обработчики очереди демонов не «перезагружают» платформу перед обработкой каждого задания. Поэтому вы должны освобождать все тяжелые ресурсы после завершения каждого задания. Например, если вы выполняете манипуляции с изображениями с помощью библиотеки GD, вы должны освободить память с помощью imagedestroy
, когда закончите обработку изображения.
Приоритеты очереди
Иногда вы можете захотеть расставить приоритеты в том, как обрабатываются ваши очереди. Например, в вашем конфигурационном файле config/queue.php
вы можете установить очередь redis
по умолчанию для вашего соединения queue
на low
. Однако иногда вы можете захотеть поместить задание в очередь с high
приоритетом, например:
dispatch((new Job)->onQueue('high'));
Чтобы запустить воркер, который проверяет, что все задания из high
очереди обработаны, прежде чем переходить к любым заданиям из low
очереди, передайте список имен очередей, разделенных запятыми, в команду work
:
php artisan queue:work --queue=high,low
Обработчики очереди и развертывание
Поскольку обработчики очередей являются долгоживущими процессами, они не заметят изменений в вашем коде без перезапуска. Таким образом, самый простой способ развернуть приложение с помощью обработчиков очередей — перезапустить обработчиков во время процесса развертывания. Вы можете изящно перезапустить всех рабочих, выполнив команду queue:restart
:
php artisan queue:restart
Эта команда предписывает всем обработчикам очереди корректно завершить работу после завершения обработки текущего задания, чтобы ни одно из существующих заданий не было потеряно. Поскольку обработчики очереди завершатся при выполнении команды queue:restart
, вы должны запустить диспетчер процессов, например Supervisor, чтобы автоматически перезапускать обработчики очереди.
Очередь использует кеш для хранения сигналов перезапуска, поэтому перед использованием этой функции следует убедиться, что драйвер кеша правильно настроен для вашего приложения.
Истечения срока действия и тайм-ауты
Истечение срока действия
В вашем конфигурационном файле config/queue.php
каждое соединение с очередью определяет опцию retry_after
. Этот параметр указывает, сколько секунд должно ожидать соединение с очередью перед повторной попыткой выполнения обрабатываемого задания. Например, если значение retry_after
установлено на 90
, задание будет возвращено в очередь, если оно обрабатывалось в течение 90 секунд без освобождения или удаления. Как правило, вы должны установить значение retry_after
на максимальное количество секунд, которое разумно должно занять ваши задания для завершения обработки.
Единственное подключение к очереди, не содержащее значение
retry_after
, — это Amazon SQS. SQS повторит задание на основе тайм-аута видимости по умолчанию, который управляется в консоли AWS.
Время ожидания работника
Artisan-команда queue:work
предоставляет параметр --timeout
. Если задание обрабатывается дольше, чем количество секунд, указанное значением времени ожидания, рабочий процесс, обрабатывающий задание, завершится с ошибкой. Как правило, рабочий процесс будет автоматически перезапущен диспетчером процессов, настроенным на вашем сервере:
php artisan queue:work --timeout=60
Параметр конфигурации retry_after
и параметр CLI --timeout
различны, но работают вместе, чтобы гарантировать, что задания не будут потеряны и что задания будут успешно обработаны только один раз.
Значение
--timeout
всегда должно быть как минимум на несколько секунд короче, чем значение конфигурацииretry_after
. Это гарантирует, что рабочий процесс, обрабатывающий замороженное задание, всегда завершается до того, как задание будет повторено. Если ваш параметр--timeout
длиннее, чем значение конфигурацииretry_after
, ваши задания могут быть обработаны дважды.
Конфигурация супервизора
В производственной среде вам нужен способ поддерживать работу процессов queue:work
. Процесс queue:work
может прекратить работу по разным причинам, таким как превышение тайм-аута рабочего процесса или выполнение команды queue:restart
.
По этой причине вам необходимо настроить монитор процессов, который может определять, когда ваши процессы queue:work
завершаются, и автоматически перезапускать их. Кроме того, мониторы процессов могут позволить вам указать, сколько процессов queue:work
вы хотите запускать одновременно. Supervisor — это монитор процессов, обычно используемый в средах Linux, и мы обсудим, как его настроить, в следующей документации.
Установка супервизора
Supervisor — это монитор процессов для операционной системы Linux, который автоматически перезапустит ваши процессы queue:work
в случае сбоя. Чтобы установить Supervisor в Ubuntu, вы можете использовать следующую команду:
sudo apt-get install supervisor
Если самостоятельная настройка и управление Supervisor кажется сложной задачей, подумайте об использовании Laravel Forge, который автоматически установит и настроит Supervisor для ваших производственных проектов Laravel.
Настройка супервизора
Файлы конфигурации супервизора обычно хранятся в каталоге /etc/supervisor/conf.d
. В этом каталоге вы можете создать любое количество конфигурационных файлов, которые сообщают супервизору, как следует отслеживать ваши процессы. Например, давайте создадим файл laravel-worker.conf
, который запускает и контролирует процессы queue:work
:
[program:laravel-worker]process_name=%(program_name)s_%(process_num)02dcommand=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600autostart=trueautorestart=truestopasgroup=truekillasgroup=trueuser=forgenumprocs=8redirect_stderr=truestdout_logfile=/home/forge/app.com/worker.logstopwaitsecs=3600
В этом примере директива numprocs
указывает Supervisor запустить восемь процессов queue:work
и отслеживать их все, автоматически перезапуская их в случае сбоя. Вы должны изменить директиву command
конфигурации, чтобы отразить желаемое соединение с очередью и параметры рабочего процесса.
Вы должны убедиться, что значение
stopwaitsecs
больше, чем количество секунд, затраченных вашим самым продолжительным заданием. В противном случае Supervisor может уничтожить задание до завершения его обработки.
Начальный супервайзер
После создания файла конфигурации вы можете обновить конфигурацию супервизора и запустить процессы с помощью следующих команд:
sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start laravel-worker:*
Дополнительные сведения о Supervisor смотрите в документации супервизора.
Работа с неудачными заданиями
Иногда ваши поставленные в очередь задания не будут выполняться. Не волнуйтесь, не всегда все идет по плану! В Laravel есть удобный способ указать максимальное количество попыток выполнения задания. После того, как задание превысит это количество попыток, оно будет вставлено в таблицу базы данных failed_jobs
. Конечно, нам нужно будет создать эту таблицу, если она еще не существует. Чтобы создать миграцию для таблицы failed_jobs
, вы можете использовать команду queue:failed-table
:
php artisan queue:failed-table php artisan migrate
При запуске процесса queue worker вы можете указать максимальное количество попыток выполнения задания с помощью переключателя --tries
в команде queue:work
. Если вы не укажете значение для параметра --tries
, задания будут выполняться только один раз или столько раз, сколько указано в свойстве $tries
класса заданий:
php artisan queue:work redis --tries=3
Используя параметр --backoff
, вы можете указать, сколько секунд Laravel должен ждать, прежде чем повторить задание, которое столкнулось с исключением. По умолчанию задание немедленно помещается обратно в очередь, чтобы его можно было выполнить снова:
php artisan queue:work redis --tries=3 --backoff=3
Если вы хотите настроить, сколько секунд Laravel должен ждать, прежде чем повторить задание, которое столкнулось с исключением, для каждого задания, вы можете сделать это, определив свойство backoff
в вашем классе задания:
/** * The number of seconds to wait before retrying the job. * * @var int */public $backoff = 3;
Если вам требуется более сложная логика для определения времени отсрочки задания, вы можете определить метод backoff
в своем классе задания:
/*** Calculate the number of seconds to wait before retrying the job.** @return int*/public function backoff(){ return 3;}
Вы можете легко настроить «экспоненциальную» отсрочку, возвращая массив значений отсрочки из метода backoff
. В этом примере задержка повторной попытки будет составлять 1 секунду для первой попытки, 5 секунд для второй попытки и 10 секунд для третьей попытки:
/*** Calculate the number of seconds to wait before retrying the job.** @return array*/public function backoff(){ return [1, 5, 10];}
Уборка после неудачных заданий
Когда определенное задание завершается сбоем, вы можете отправить оповещение своим пользователям или отменить любые действия, которые были частично завершены заданием. Чтобы добиться этого, вы можете определить метод failed
в своем классе заданий. Экземпляр Throwable
, вызвавший сбой задания, будет передан методу failed
:
<?php namespace App\Jobs; use App\Models\Podcast;use App\Services\AudioProcessor;use Illuminate\Bus\Queueable;use Illuminate\Contracts\Queue\ShouldQueue;use Illuminate\Queue\InteractsWithQueue;use Illuminate\Queue\SerializesModels;use Throwable; class ProcessPodcast implements ShouldQueue{ use InteractsWithQueue, Queueable, SerializesModels; /** * The podcast instance. * * @var \App\Podcast */ protected $podcast; /** * Create a new job instance. * * @param \App\Models\Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * Execute the job. * * @param \App\Services\AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // Process uploaded podcast... } /** * Handle a job failure. * * @param \Throwable $exception * @return void */ public function failed(Throwable $exception) { // Send user notification of failure, etc... }}
Новый экземпляр задания создается перед вызовом метода
failed
; поэтому любые изменения свойств класса, которые могли произойти в методеhandle
, будут потеряны.
Повтор неудачных заданий
Чтобы просмотреть все невыполненные задания, которые были вставлены в вашу таблицу базы данных failed_jobs
, вы можете использовать Artisan-команду queue:failed
:
php artisan queue:failed
Команда queue:failed
выводит идентификатор задания, соединение, очередь, время сбоя и другую информацию о задании. Идентификатор задания может использоваться для повторной попытки неудачно выполненного задания. Например, чтобы повторить неудачное задание с идентификатором ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
, введите следующую команду:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
При необходимости вы можете передать команде несколько идентификаторов:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
Вы также можете повторить все невыполненные задания для определенной очереди:
php artisan queue:retry --queue=name
Чтобы повторить все ваши неудачные задания, выполните команду queue:retry
и передайте all
в качестве идентификатора:
php artisan queue:retry all
Если вы хотите удалить невыполненное задание, вы можете использовать команду queue:forget
:
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
При использовании Horizon для удаления невыполненного задания следует использовать команду
horizon:forget
вместо командыqueue:forget
.
Чтобы удалить все невыполненные задания из таблицы failed_jobs
, вы можете использовать команду queue:flush
:
php artisan queue:flush
Игнорирование отсутствующих моделей
При внедрении модели Eloquent в задание модель автоматически сериализуется перед помещением в очередь и повторно извлекается из базы данных при обработке задания. Однако, если модель была удалена, пока задание ожидало обработки рабочим процессом, ваше задание может завершиться с ошибкой ModelNotFoundException
.
Для удобства вы можете выбрать автоматическое удаление заданий с отсутствующими моделями, установив для свойства deleteWhenMissingModels
вашего задания значение true
. Когда для этого свойства установлено значение true
, Laravel спокойно отменит задание, не вызывая исключения:
/** * Delete the job if its models no longer exist. * * @var bool */public $deleteWhenMissingModels = true;
Сокращение неудачных заданий
Вы можете удалить все записи в таблице failed_jobs
вашего приложения, вызвав Artisan-команду queue:prune-failed
:
php artisan queue:prune-failed
Если вы укажете параметр --hours
для команды, будут сохранены только записи неудачных заданий, которые были вставлены в течение последних N часов. Например, следующая команда удалит все записи неудачных заданий, которые были вставлены более 48 часов назад:
php artisan queue:prune-failed --hours=48
Хранение невыполненных заданий в DynamoDB
Laravel также поддерживает хранение записей о невыполненных заданиях в DynamoDB вместо таблицы реляционной базы данных. Однако необходимо создать таблицу DynamoDB для хранения всех записей о невыполненных заданиях. Как правило, эта таблица должна называться failed_jobs
, но вы должны назвать таблицу на основе значения параметра конфигурации queue.failed.table
в файле конфигурации queue
вашего приложения.
Таблица failed_jobs
должна иметь строковый первичный ключ раздела с именем application
и строковый первичный ключ сортировки с именем uuid
. Часть ключа application
будет содержать имя вашего приложения, как определено значением конфигурации name
в файле конфигурации app
вашего приложения. Поскольку имя приложения является частью ключа таблицы DynamoDB, вы можете использовать одну и ту же таблицу для хранения невыполненных заданий для нескольких приложений Laravel.
Кроме того, убедитесь, что вы установили AWS SDK, чтобы ваше приложение Laravel могло взаимодействовать с Amazon DynamoDB:
composer require aws/aws-sdk-php
Затем установите для параметра конфигурации queue.failed.driver
значение dynamodb
. Кроме того, вы должны определить параметры конфигурации key
, secret
и region
в массиве конфигурации неудачного задания. Эти параметры будут использоваться для аутентификации в AWS. При использовании драйвера dynamodb
параметр конфигурации queue.failed.database
не нужен:
'failed' => [ 'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'), 'key' => env('AWS_ACCESS_KEY_ID'), 'secret' => env('AWS_SECRET_ACCESS_KEY'), 'region' => env('AWS_DEFAULT_REGION', 'us-east-1'), 'table' => 'failed_jobs',],
Отключение хранилища неудачных заданий
Вы можете указать Laravel отбрасывать неудачные задания без их сохранения, установив для параметра конфигурации queue.failed.driver
значение null
. Как правило, это можно сделать с помощью переменной окружения QUEUE_FAILED_DRIVER
:
QUEUE_FAILED_DRIVER=null
События неудачного задания
Если вы хотите зарегистрировать прослушиватель событий, который будет вызываться при сбое задания, вы можете использовать метод failing
фасада Queue
. Например, мы можем прикрепить замыкание к этому событию из метода boot
класса AppServiceProvider
, входящего в состав Laravel:
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue;use Illuminate\Support\ServiceProvider;use Illuminate\Queue\Events\JobFailed; class AppServiceProvider extends ServiceProvider{ /** * Register any application services. * * @return void */ public function register() { // } /** * Bootstrap any application services. * * @return void */ public function boot() { Queue::failing(function (JobFailed $event) { // $event->connectionName // $event->job // $event->exception }); }}
Удаление заданий из очередей
При использовании Horizon, для удаления заданий из очереди следует использовать команду
horizon:clear
, а не командуqueue:clear
.
Если вы хотите удалить все задания из очереди по умолчанию соединения по умолчанию, вы можете сделать это с помощью Artisan-команды queue:clear
:
php artisan queue:clear
Вы также можете указать аргумент connection
и параметр queue
для удаления заданий из определенного соединения и очереди:
php artisan queue:clear redis --queue=emails
Удаление заданий из очередей доступно только для драйверов очередей SQS, Redis и базы данных. Кроме того, процесс удаления сообщения SQS занимает до 60 секунд, поэтому задания, отправленные в очередь SQS в течение 60 секунд после очистки очереди, также могут быть удалены.
Мониторинг ваших очередей
Если ваша очередь получает внезапный приток заданий, она может переполниться, что приведет к длительному времени ожидания завершения заданий. Если вы хотите, Laravel может предупредить вас, когда количество заданий в вашей очереди превысит указанный порог.
Для начала вы должны запланировать команду queue:monitor
выполняться каждую минуту. Команда принимает имена очередей, которые вы хотите отслеживать, а также желаемый порог количества заданий:
php artisan queue:monitor redis:default,redis:deployments --max=100
Одного планирования этой команды недостаточно, чтобы вызвать уведомление о переполненном статусе очереди. Когда команда обнаруживает очередь, в которой количество заданий превышает пороговое значение, будет отправлено событие Illuminate\Queue\Events\QueueBusy
. Вы можете прослушать это событие в EventServiceProvider
вашего приложения, чтобы отправить уведомление вам или вашей команде разработчиков:
use App\Notifications\QueueHasLongWaitTime;use Illuminate\Queue\Events\QueueBusy;use Illuminate\Support\Facades\Event;use Illuminate\Support\Facades\Notification; /** * Register any other events for your application. * * @return void */public function boot(){ Event::listen(function (QueueBusy $event) { Notification::route('mail', 'dev@example.com') ->notify(new QueueHasLongWaitTime( $event->connection, $event->queue, $event->size )); });}
События задания
Используя методы before
и after
в Queue
facade, вы можете указать обратные вызовы, которые должны выполняться до или после обработки задания в очереди. Эти обратные вызовы — отличная возможность выполнить дополнительную регистрацию или увеличить статистику для панели мониторинга. Как правило, эти методы следует вызывать из метода boot
поставщика услуг. Например, мы можем использовать AppServiceProvider
, включенный в Laravel:
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue;use Illuminate\Support\ServiceProvider;use Illuminate\Queue\Events\JobProcessed;use Illuminate\Queue\Events\JobProcessing; class AppServiceProvider extends ServiceProvider{ /** * Register any application services. * * @return void */ public function register() { // } /** * Bootstrap any application services. * * @return void */ public function boot() { Queue::before(function (JobProcessing $event) { // $event->connectionName // $event->job // $event->job->payload() }); Queue::after(function (JobProcessed $event) { // $event->connectionName // $event->job // $event->job->payload() }); }}
Используя метод looping
в Queue
facade, вы можете указать обратные вызовы, которые выполняются до того, как рабочий процесс попытается получить задание из очереди. Например, вы можете зарегистрировать замыкание для отката любых транзакций, которые остались открытыми из-за предыдущего неудачного задания:
use Illuminate\Support\Facades\DB;use Illuminate\Support\Facades\Queue; Queue::looping(function () { while (DB::transactionLevel() > 0) { DB::rollBack(); }});