中文在线一区二区_欧美在线综合_久久久久久综合_欧美一区二区三区视频_国产免费看_国产福利精品一区

Queue 介紹

原理

EasySwoole 封裝實(shí)現(xiàn)了一個(gè)輕量級(jí)的隊(duì)列,默認(rèn)以 Redis 作為隊(duì)列驅(qū)動(dòng)器。

可以自己實(shí)現(xiàn)一個(gè)隊(duì)列驅(qū)動(dòng)來(lái)實(shí)現(xiàn)隊(duì)列,用 kafka 作為隊(duì)列驅(qū)動(dòng)器或者 其他驅(qū)動(dòng)器方式 作為隊(duì)列驅(qū)動(dòng)器,來(lái)進(jìn)行存儲(chǔ)。

從上可知,Queue 并不是一個(gè)單獨(dú)使用的組件,它更像一個(gè)對(duì)不同驅(qū)動(dòng)的隊(duì)列進(jìn)行統(tǒng)一封裝的門面組件。

組件要求

  • ext-swoole: >=4.4.0
  • easyswoole/component: ^2.0
  • easyswoole/redis-pool: ~2.2.0

安裝方法

composer require easyswoole/queue=2.1.x

倉(cāng)庫(kù)地址

easyswoole/queue

基本使用

  • 注冊(cè)隊(duì)列驅(qū)動(dòng)器
  • 設(shè)置消費(fèi)進(jìn)程
  • 生產(chǎn)者投遞任務(wù)

定義一個(gè)隊(duì)列

<?php

namespace App\Utility;

use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;

class MyQueue extends Queue
{
    use Singleton;
}

定義消費(fèi)進(jìn)程

<?php

namespace App\Utility;

use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;

class QueueProcess extends AbstractProcess
{
    protected function run($arg)
    {
        go(function () {
            MyQueue::getInstance()->consumer()->listen(function (Job $job) {
                // 打印消費(fèi)數(shù)據(jù)
                var_dump($job->getJobData());
            });
        });
    }
}

可以多進(jìn)程,多協(xié)程消費(fèi)

驅(qū)動(dòng)注冊(cè)

<?php

namespace EasySwoole\EasySwoole;

use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;
use EasySwoole\Queue\Job;

class EasySwooleEvent implements Event
{

    public static function initialize()
    {
        // TODO: Implement initialize() method.
        date_default_timezone_set('Asia/Shanghai');
    }

    public static function mainServerCreate(EventRegister $register)
    {
        // redis-pool 的使用請(qǐng)看 redis 章節(jié)文檔(http://www.jrrswxmm.cn/Components/Redis/pool.html)
        // 注冊(cè)一個(gè)名為 queue 的 Redis 連接池
        \EasySwoole\RedisPool\RedisPool::getInstance()->register(new \EasySwoole\Redis\Config\RedisConfig(
            [
                'host' => '127.0.0.1',
                'port' => '6379',
                // [可選參數(shù)] 密碼
                // 'auth' => ''
            ]
        ), 'queue');

        // 獲取 Redis 連接池中的一個(gè) Redis 連接對(duì)象
        $redisPool = \EasySwoole\RedisPool\RedisPool::getInstance()->getPool('queue');

        // 配置隊(duì)列驅(qū)動(dòng)器,底層使用 Redis 驅(qū)動(dòng),并設(shè)置隊(duì)列名為 'queue'
        $driver = new \EasySwoole\Queue\Driver\Redis($redisPool, 'queue');

        // 注冊(cè)自定義隊(duì)列
        \App\Utility\MyQueue::getInstance($driver);

        // 注冊(cè)一個(gè)消費(fèi)進(jìn)程
        \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new \App\Utility\QueueProcess());

        // 模擬生產(chǎn)者,投遞任務(wù)到隊(duì)列中,可以在任意位置投遞
        $register->add($register::onWorkerStart, function ($server, $id) {
            if ($id == 0) {
                Timer::getInstance()->loop(3000, function () {
                    $job = new Job();
                    // 設(shè)置投遞的隊(duì)列任務(wù)數(shù)據(jù)
                    $job->setJobData(['time' => \time()]);
                    \App\Utility\MyQueue::getInstance()->producer()->push($job);
                });
            }
        });
    }
}

關(guān)于進(jìn)程安全退出問(wèn)題請(qǐng)看 進(jìn)程章節(jié)

進(jìn)階使用

我們可以自定義驅(qū)動(dòng),實(shí)現(xiàn) RabbitMQKafka 等消費(fèi)隊(duì)列軟件的封裝。

用戶需要定義類,并實(shí)現(xiàn) \EasySwoole\Queue\QueueDriverInterface 接口的幾個(gè)方法即可。該接口的詳細(xì)實(shí)現(xiàn)請(qǐng)看下文。

QueueDriverInterface 接口類實(shí)現(xiàn)

<?php

namespace EasySwoole\Queue;

interface QueueDriverInterface
{
    public function push(Job $job):bool ;

    public function pop(float $timeout = 3.0):?Job;

    public function size():?int ;
}

組件自帶的 Redis 隊(duì)列驅(qū)動(dòng)器實(shí)現(xiàn)

<?php

namespace EasySwoole\Queue\Driver;

use EasySwoole\Queue\Job;
use EasySwoole\Queue\QueueDriverInterface;
use EasySwoole\Redis\Redis as Connection;
use EasySwoole\RedisPool\Pool;

class Redis implements QueueDriverInterface
{

    protected $pool;
    protected $queueName;
    public function __construct(Pool $pool,string $queueName = 'easy_queue')
    {
        $this->pool = $pool;
        $this->queueName = $queueName;
    }

    public function push(Job $job): bool
    {
        $data = serialize($job);
        return $this->pool->invoke(function (Connection $connection)use($data){
            return $connection->lPush($this->queueName,$data);
        });
    }

    public function pop(float $timeout = 3.0): ?Job
    {
        return $this->pool->invoke(function (Connection $connection){
            $data =  $connection->rPop($this->queueName);
            if($data){
                return unserialize($data);
            }
            return null;
        });
    }

    public function size(): ?int
    {
        return $this->pool->invoke(function (Connection $connection){
            return $connection->lLen($this->queueName);
        });
    }
}

Queue 多節(jié)點(diǎn)使用

定義第一個(gè)隊(duì)列(自定義 nodeId)

<?php

namespace App\Utility;

use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;
use EasySwoole\Queue\QueueDriverInterface;

class MyQueue1 extends Queue
{
    use Singleton;

    public function __construct(QueueDriverInterface $driver)
    {
        parent::__construct($driver);
        // 自定義 nodeId
        $this->setNodeId('xxxxx1');
    }
}

定義第二個(gè)隊(duì)列(自動(dòng)生成 nodeId)

<?php

namespace App\Utility;

use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;

class MyQueue2 extends Queue
{
    use Singleton;
}

獲取節(jié)點(diǎn)id

<?php

namespace App\Utility;

use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;

class QueueProcess extends AbstractProcess
{
    protected function run($arg)
    {   
        // 消費(fèi)隊(duì)列
        go(function () {
            MyQueue1::getInstance()->consumer()->listen(function (Job $job) {
                // 打印 節(jié)點(diǎn)Id
                var_dump($job->getNodeId());
                // 打印 任務(wù)Id
                var_dump($job->getJobId());
            });
            MyQueue2::getInstance()->consumer()->listen(function (Job $job) {
                // 打印 節(jié)點(diǎn)Id
                var_dump($job->getNodeId());
                // 打印 任務(wù)Id
                var_dump($job->getJobId());
            });
        });
    }
}

可以多進(jìn)程,多協(xié)程消費(fèi)

驅(qū)動(dòng)注冊(cè)

<?php

namespace EasySwoole\EasySwoole;

use App\Utility\QueueProcess;
use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;
use EasySwoole\Queue\Job;

class EasySwooleEvent implements Event
{

    public static function initialize()
    {
        // TODO: Implement initialize() method.
        date_default_timezone_set('Asia/Shanghai');
    }

    public static function mainServerCreate(EventRegister $register)
    {
        // redis-pool 的使用請(qǐng)看 redis 章節(jié)文檔(http://www.jrrswxmm.cn/Components/Redis/pool.html)
        // 注冊(cè)一個(gè)名為 queue 的 Redis 連接池
        \EasySwoole\RedisPool\RedisPool::getInstance()->register(new \EasySwoole\Redis\Config\RedisConfig(
            [
                'host' => '127.0.0.1',
                'port' => '6379',
                // [可選參數(shù)] 密碼
                // 'auth' => ''
            ]
        ), 'queue');

        // 獲取 Redis 連接池中的一個(gè) Redis 連接對(duì)象
        $redisPool = \EasySwoole\RedisPool\RedisPool::getInstance()->getPool('queue');

        // 配置隊(duì)列驅(qū)動(dòng)器,底層使用 Redis 驅(qū)動(dòng),并設(shè)置隊(duì)列名為 'queue'
        $driver = new \EasySwoole\Queue\Driver\Redis($redisPool, 'queue');

        // 【這里是重點(diǎn)】
        // 注冊(cè)自定義隊(duì)列
        \App\Utility\MyQueue1::getInstance($driver);
        \App\Utility\MyQueue2::getInstance($driver);

        // 注冊(cè)一個(gè)消費(fèi)進(jìn)程
        \EasySwoole\Component\Process\Manager::getInstance()->addProcess(new \App\Utility\QueueProcess());

        // 模擬生產(chǎn)者,投遞任務(wù)到隊(duì)列中,可以在任意位置投遞
        $register->add($register::onWorkerStart, function ($server, $id) {
            if ($id == 0) {
                Timer::getInstance()->loop(3000, function () {
                    $job = new Job();
                    // 設(shè)置投遞的隊(duì)列任務(wù)數(shù)據(jù)
                    $job->setJobData(['time' => \time()]);
                    // 這里是重點(diǎn)
                    \App\Utility\MyQueue1::getInstance()->producer()->push($job);
                    \App\Utility\MyQueue2::getInstance()->producer()->push($job);
                });
            }
        });
    }
}

相關(guān)倉(cāng)庫(kù)

EasySwoole 中利用 Redis 實(shí)現(xiàn)消息隊(duì)列

如何利用 EasySwoole 多進(jìn)程多協(xié)程 Redis 隊(duì)列實(shí)現(xiàn)爬蟲(chóng)

主站蜘蛛池模板: 国产三级一区二区三区 | 91精品一区二区三区久久久久久 | 亚洲日韩中文字幕一区 | 91精品久久久久久9s密挑 | 久草久| 国产欧美日韩免费 | a视频在线观看 | 日本不卡在线 | 秋霞精品 | 亚洲精品网址 | 亚洲午夜精品片久久www慈禧 | 国产成人精品免费视频大全最热 | 精品国产91亚洲一区二区三区www | 久视频在线观看 | 亚洲av毛片一区二二区三三区 | 欧美日韩久久久久 | 国产成人欧美一区二区三区的 | 久久99精品国产麻豆婷婷洗澡 | 中文字幕在线观看日韩 | 欧美日韩中文字幕 | 一级特黄毛片 | 成人性生交大片免费网站 | 久久久久久久国产精品 | 亚洲视频欧美视频 | 国产二区视频 | 欧美中文字幕一区二区三区亚洲 | 日韩av电影在线免费观看 | 99久久婷婷国产精品综合 | 午夜影院在线播放 | 一区二区在线不卡 | 日韩精品影院 | 日韩色在线 | 欧洲亚洲精品久久久久 | 国产精品久久久久久久一区探花 | 亚洲欧美中文字幕 | 96久久久 | 久久综合成人精品亚洲另类欧美 | 国产激情久久久久久 | 久久久久久国产精品 | 中文字幕视频 | 久久精品电影 |