queue
基于beanstalkd实现的任务队列,方便去分发任务和解决任务,此库可适用以下场景:
- 用作延时队列:比如可以用于如果用户30分钟内不操作,任务关闭。
- 用作定时任务:比如可以用于专门的后台任务。
- 用作异步操作:这是所有消息队列都最常用的,先将任务仍进去,顺序执行。
- 用作循环队列:用release命令可以循环执行任务,比如可以做负载均衡任务分发。
- 用作兜底机制:比如一个请求有失败的概率,可以用Beanstalk不断重试,设定超时时间,时间内尝试到成功为止
关于Beanstalkd
Beanstalkd 是一个轻量级的内存型队列,利用了和 Memcache 类似的协议。依赖 libevent 单线程事件分发机制, 可以部署多个实例,但是高并发支持还是不太友好;
几个重要的概念
- job:一个需要异步处理的任务,是 Beanstalkd 中的基本单元,需要放在一个 tube 中。
- tube:一个有名的任务队列,用来存储统一类型的 job,是 producer 和 consumer 操作的对象。
- producer:Job 的生产者,通过 put 命令来将一个 job 放到一个 tube 中。
- consumer:Job的消费者,通过 reserve/release/bury/delete 命令来获取 job 或改变 job 的状态。
Job的生命周期
任务在队里之中被称作 Job. 一个 Job 在 Beanstalkd 中有以下的生命周期:
- put 将一个任务放置进 tube 中
- deayed 这个任务现在再等待中,需要若干秒才能准备完毕【延迟队列】
- ready 这个任务已经准备好了,可以消费了。所有的消费都是要从取 ready 状态的 job
- reserved 这个任务已经被消费者消费
- release 这个 job 执行失败了,把它放进 ready 状态队列中。让其他队列执行
- bury 这个 job 执行失败了,但不希望其他队列执行,先把它埋起来
put with delay release with delay
----------------> [DELAYED] <------------.
| |
| (time passes) |
| |
put v reserve | delete
-----------------> [READY] ---------> [RESERVED] --------> *poof*
^ ^ | |
| \ release | |
| `-------------' |
| |
| kick |
| |
| bury |
[BURIED] <---------------'
|
| delete
`--------> *poof*
怎么使用
具体实例可参考测试例子
安装
composer require kickpeach/queue -vvv
Overview
Create Queue
use KickPeach\Queue\Drivers\Beanstalkd;
use KickPeach\Queue\Queue;
$queue = new Queue(new Beanstalkd($host, $port));
Create Job
<?php
use KickPeach\Queue\Job;
class ExampleJob extends Job
{
/**
* @var string job queue name (beanstalkd tube)
*/
public $queue = 'default';
/**
* The "time to run" for all pushed jobs. (beanstalkd ttr, timeout)
*
* @var int 允许 worker 执行的最大秒数,超时 job 将会被 release 到 ready 状态.
*/
public $retry_after = 60;
/**
* The number of times the job may be attempted.
*
* @var int 最大尝试次数
*/
public $tries = 1;
/**
* @var array
*/
public $words;
public function __construct(array $words)
{
$this->words = $words;
}
public function handle()
{
var_export($this->words);
var_dump($this->retry_after, $this->tries);
// throw new \Exception('handle job with error...lol ^_^');
}
}
specifying job queue by defining $queue
, specifying Max Job Attempts by defining $tries
, specifying timeout Values by defining $retry_after
.
Dispatch Job
$queue->push(new ExampleJob(['i', 'love', 'china']));
of cause, you can dispatch job later (push a delayed job) :
$queue->later(60, new ExampleJob(['i', 'love', 'china']));
Process Job
$worker = new Worker($queue);
$worker->daemon();
Note:
$worker->daemon()
is blocking.
by default, the worker will will listen the tube named default
, you can specifying worker queue (beanstalkd tube) like :
$queueTube = 'sendEmail';
$worker = new Worker($queue, $queueTube);
$worker->daemon();
you can specifying worker with sleep time while there is no job, and memoryLimit, like :
$sleep = 60;
$memoryLimit = 128;
$queueTube = 'sendEmail';
$worker = new Worker($queue, $queueTube);
$worker->daemon($sleep, $memoryLimit);
License
The MIT License (MIT).