Queue 介紹
原理
EasySwoole
封裝實(shí)現(xiàn)了一個(gè)輕量級(jí)的隊(duì)列,默認(rèn)以 Redis
作為隊(duì)列驅(qū)動(dòng)器。
可以自己實(shí)現(xiàn)一個(gè)隊(duì)列驅(qū)動(dòng)來(lái)實(shí)現(xiàn)隊(duì)列,用 kafka
作為隊(duì)列驅(qū)動(dòng)器或者 其他驅(qū)動(dòng)器方式
作為隊(duì)列驅(qū)動(dòng)器,來(lái)進(jìn)行存儲(chǔ)。
從上可知,Queue
并不是一個(gè)單獨(dú)使用的組件,它更像一個(gè)對(duì)不同驅(qū)動(dòng)的隊(duì)列進(jìn)行統(tǒng)一封裝的門面組件。
組件要求
- ext-swoole: >=4.4.0
- easyswoole/component: ^2.0
- easyswoole/redis-pool: ~2.2.0
安裝方法
composer require easyswoole/queue=2.1.x
倉(cāng)庫(kù)地址
基本使用
- 注冊(cè)隊(duì)列驅(qū)動(dòng)器
- 設(shè)置消費(fèi)進(jìn)程
- 生產(chǎn)者投遞任務(wù)
定義一個(gè)隊(duì)列
<?php
namespace App\Utility;
use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;
class MyQueue extends Queue
{
use Singleton;
}
定義消費(fèi)進(jìn)程
<?php
namespace App\Utility;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;
class QueueProcess extends AbstractProcess
{
protected function run($arg)
{
go(function () {
MyQueue::getInstance()->consumer()->listen(function (Job $job) {
// 打印消費(fèi)數(shù)據(jù)
var_dump($job->getJobData());
});
});
}
}
可以多進(jìn)程,多協(xié)程消費(fèi)
驅(qū)動(dòng)注冊(cè)
<?php
namespace EasySwoole\EasySwoole;
use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;
use EasySwoole\Queue\Job;
class EasySwooleEvent implements Event
{
public static function initialize()
{
// TODO: Implement initialize() method.
date_default_timezone_set('Asia/Shanghai');
}
public static function mainServerCreate(EventRegister $register)
{
// redis-pool 的使用請(qǐng)看 redis 章節(jié)文檔(http://www.jrrswxmm.cn/Components/Redis/pool.html)
// 注冊(cè)一個(gè)名為 queue 的 Redis 連接池
\EasySwoole\RedisPool\RedisPool::getInstance()->register(new \EasySwoole\Redis\Config\RedisConfig(
[
'host' => '127.0.0.1',
'port' => '6379',
// [可選參數(shù)] 密碼
// 'auth' => ''
]
), 'queue');
// 獲取 Redis 連接池中的一個(gè) Redis 連接對(duì)象
$redisPool = \EasySwoole\RedisPool\RedisPool::getInstance()->getPool('queue');
// 配置隊(duì)列驅(qū)動(dòng)器,底層使用 Redis 驅(qū)動(dòng),并設(shè)置隊(duì)列名為 'queue'
$driver = new \EasySwoole\Queue\Driver\Redis($redisPool, 'queue');
// 注冊(cè)自定義隊(duì)列
\App\Utility\MyQueue::getInstance($driver);
// 注冊(cè)一個(gè)消費(fèi)進(jìn)程
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new \App\Utility\QueueProcess());
// 模擬生產(chǎn)者,投遞任務(wù)到隊(duì)列中,可以在任意位置投遞
$register->add($register::onWorkerStart, function ($server, $id) {
if ($id == 0) {
Timer::getInstance()->loop(3000, function () {
$job = new Job();
// 設(shè)置投遞的隊(duì)列任務(wù)數(shù)據(jù)
$job->setJobData(['time' => \time()]);
\App\Utility\MyQueue::getInstance()->producer()->push($job);
});
}
});
}
}
關(guān)于進(jìn)程安全退出問(wèn)題請(qǐng)看 進(jìn)程章節(jié)。
進(jìn)階使用
我們可以自定義驅(qū)動(dòng),實(shí)現(xiàn) RabbitMQ
、Kafka
等消費(fèi)隊(duì)列軟件的封裝。
用戶需要定義類,并實(shí)現(xiàn) \EasySwoole\Queue\QueueDriverInterface
接口的幾個(gè)方法即可。該接口的詳細(xì)實(shí)現(xiàn)請(qǐng)看下文。
QueueDriverInterface 接口類實(shí)現(xiàn)
<?php
namespace EasySwoole\Queue;
interface QueueDriverInterface
{
public function push(Job $job):bool ;
public function pop(float $timeout = 3.0):?Job;
public function size():?int ;
}
組件自帶的 Redis
隊(duì)列驅(qū)動(dòng)器實(shí)現(xiàn)
<?php
namespace EasySwoole\Queue\Driver;
use EasySwoole\Queue\Job;
use EasySwoole\Queue\QueueDriverInterface;
use EasySwoole\Redis\Redis as Connection;
use EasySwoole\RedisPool\Pool;
class Redis implements QueueDriverInterface
{
protected $pool;
protected $queueName;
public function __construct(Pool $pool,string $queueName = 'easy_queue')
{
$this->pool = $pool;
$this->queueName = $queueName;
}
public function push(Job $job): bool
{
$data = serialize($job);
return $this->pool->invoke(function (Connection $connection)use($data){
return $connection->lPush($this->queueName,$data);
});
}
public function pop(float $timeout = 3.0): ?Job
{
return $this->pool->invoke(function (Connection $connection){
$data = $connection->rPop($this->queueName);
if($data){
return unserialize($data);
}
return null;
});
}
public function size(): ?int
{
return $this->pool->invoke(function (Connection $connection){
return $connection->lLen($this->queueName);
});
}
}
Queue 多節(jié)點(diǎn)使用
定義第一個(gè)隊(duì)列(自定義 nodeId)
<?php
namespace App\Utility;
use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;
use EasySwoole\Queue\QueueDriverInterface;
class MyQueue1 extends Queue
{
use Singleton;
public function __construct(QueueDriverInterface $driver)
{
parent::__construct($driver);
// 自定義 nodeId
$this->setNodeId('xxxxx1');
}
}
定義第二個(gè)隊(duì)列(自動(dòng)生成 nodeId)
<?php
namespace App\Utility;
use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;
class MyQueue2 extends Queue
{
use Singleton;
}
獲取節(jié)點(diǎn)id
<?php
namespace App\Utility;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;
class QueueProcess extends AbstractProcess
{
protected function run($arg)
{
// 消費(fèi)隊(duì)列
go(function () {
MyQueue1::getInstance()->consumer()->listen(function (Job $job) {
// 打印 節(jié)點(diǎn)Id
var_dump($job->getNodeId());
// 打印 任務(wù)Id
var_dump($job->getJobId());
});
MyQueue2::getInstance()->consumer()->listen(function (Job $job) {
// 打印 節(jié)點(diǎn)Id
var_dump($job->getNodeId());
// 打印 任務(wù)Id
var_dump($job->getJobId());
});
});
}
}
可以多進(jìn)程,多協(xié)程消費(fèi)
驅(qū)動(dòng)注冊(cè)
<?php
namespace EasySwoole\EasySwoole;
use App\Utility\QueueProcess;
use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;
use EasySwoole\Queue\Job;
class EasySwooleEvent implements Event
{
public static function initialize()
{
// TODO: Implement initialize() method.
date_default_timezone_set('Asia/Shanghai');
}
public static function mainServerCreate(EventRegister $register)
{
// redis-pool 的使用請(qǐng)看 redis 章節(jié)文檔(http://www.jrrswxmm.cn/Components/Redis/pool.html)
// 注冊(cè)一個(gè)名為 queue 的 Redis 連接池
\EasySwoole\RedisPool\RedisPool::getInstance()->register(new \EasySwoole\Redis\Config\RedisConfig(
[
'host' => '127.0.0.1',
'port' => '6379',
// [可選參數(shù)] 密碼
// 'auth' => ''
]
), 'queue');
// 獲取 Redis 連接池中的一個(gè) Redis 連接對(duì)象
$redisPool = \EasySwoole\RedisPool\RedisPool::getInstance()->getPool('queue');
// 配置隊(duì)列驅(qū)動(dòng)器,底層使用 Redis 驅(qū)動(dòng),并設(shè)置隊(duì)列名為 'queue'
$driver = new \EasySwoole\Queue\Driver\Redis($redisPool, 'queue');
// 【這里是重點(diǎn)】
// 注冊(cè)自定義隊(duì)列
\App\Utility\MyQueue1::getInstance($driver);
\App\Utility\MyQueue2::getInstance($driver);
// 注冊(cè)一個(gè)消費(fèi)進(jìn)程
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new \App\Utility\QueueProcess());
// 模擬生產(chǎn)者,投遞任務(wù)到隊(duì)列中,可以在任意位置投遞
$register->add($register::onWorkerStart, function ($server, $id) {
if ($id == 0) {
Timer::getInstance()->loop(3000, function () {
$job = new Job();
// 設(shè)置投遞的隊(duì)列任務(wù)數(shù)據(jù)
$job->setJobData(['time' => \time()]);
// 這里是重點(diǎn)
\App\Utility\MyQueue1::getInstance()->producer()->push($job);
\App\Utility\MyQueue2::getInstance()->producer()->push($job);
});
}
});
}
}
相關(guān)倉(cāng)庫(kù)
EasySwoole 中利用 Redis 實(shí)現(xiàn)消息隊(duì)列
如何利用 EasySwoole 多進(jìn)程多協(xié)程 Redis 隊(duì)列實(shí)現(xiàn)爬蟲(chóng)