博文記錄
PHP 2022-08-29 19:41:41 80 0

在 Hyperf 开发项目过程中经常使用 Job 进行一些耗时任务处理,而 Job 的执行是在异步消费队列中,在异步消费进程启动时会对数据进行初始化存储到容器的服务类理,以供处理后续的 Job 任务。

当用户通过 HTTP 接口更新数据到数据库中时,此时异步消费进程中的内存数据并没有得到及时更新,这就会造成一些困扰。

如何解决这个问题呢?

  1. 在 Job 执行时,每次读取一次最新的配置信息。虽然不会有延迟,但是当Job过多时,读取数据库的频次会非常高,影响 Job 处理速度,以及网络资源消耗。
  2. 在 Job 执行时,判断上次更新的时间,如果超过预设时间则进行更新。缺点是依然有延迟。

希望能够通过一个事件的方式进行触发更新,当 HTTP 接口产生了数据变更,此时将内存数据进行更新。

但是 HTTP 接口所在的进程是多个 worker 进程,并不能直接对异步进程进行读写操作。通过系统自带的  $container 进行获取服务类,是获取当前 worker 进程内的,而不是异步消费进程的。

还有个思路,就是当异步消费进程配置为1个时,可以在配置更新时产生一个新Job,在新 Job 里面对内存进行更新,因为Job执行所处的进程是异步消费进程。

但如果配置了多个消费进程,如3个,就无法通过这样的方式,因为Job无法确定会落在哪个进程里执行,只能让一个进程失效,无法实现更新所有进程。

配置文件在 async_queue.php 中配置消费进程  processes 进程个数。

解决方案

这里通过 swoole 自带的进程通信进行触发自定义消息事件,并创建触发器监听自定义消息事件,自定义触发器所在的进程也是异步消费进程,只需要触发器对自身进程进行更新就行了。

在调用触发端一次性获取所有进程,进行投递消息即可实现对所有异步消费进程进行更新。

 

创建一个自定义 Event

app/Event/BlackListUpdated.php

<?php

namespace App\Event;
class BlackListUpdated
{

    public $message = '';

    public function __construct(string $message)
    {
        $this->message = $message;
    }

}
 

创建自定义事件监听器

app/Listener/BlackListUpdatedListener.php

<?php

namespace App\Listener;


use App\Event\BlackListUpdated;
use App\Services\BlackListService;
use Hyperf\Contract\ContainerInterface;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Process\ProcessCollector;
use Hyperf\Event\Annotation\Listener;
use Hyperf\Framework\Event\OnPipeMessage;
use Hyperf\Process\Event\PipeMessage as UserProcessPipeMessage;

/**
 * @Listener
 *
 * Class BlackListUpdatedListener
 * @package App\Listener
 */
class BlackListUpdatedListener implements ListenerInterface
{
    protected $container;

    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;
    }


    public function listen(): array
    {
        // 返回一個該監聽器要監聽的事件陣列,可以同時監聽多個事件
        return [
            OnPipeMessage::class,
            UserProcessPipeMessage::class,
        ];
    }

    /**
     * @param object $event
     */
    public function process(object $event)
    {
        // PipeMessage
        if ($event instanceof OnPipeMessage || $event instanceof UserProcessPipeMessage) {
            if($event->data instanceof BlackListUpdated){
                $this->container->get(BlackListService::class)->flushKeyword();
            }
        }

    }
}

这里不要监听 BlackListUpdated 事件,因为并不能给所有进程投递 BlackListUpdated 事件。

在 process 方法执行中判断消息数据类型对容器数据进行更新。

消息事件投递端

在 HTTP 接口中对数据更新后进行投递事件。

<?php

//...
    /**
     * 加入关键词
     *
     * @param Request $request
     * @return array
     */
    public function addKeyword(Request $request): array
    {
        $keyword = $request->input('keyword');
        $this->blackListService->addKeyword($keyword);

        if (class_exists(ProcessCollector::class) && !ProcessCollector::isEmpty()) {
            $processes = ProcessCollector::all();
            if ($processes) {
                $string = serialize(new BlackListUpdated('update keyword'. date('Y-m-d H:i:s')));
                foreach ($processes as $process) {
                    $process->exportSocket()->send($string, 10);
                }
            }
        }

        return eeJson('add keyword into blacklist service');
    }

//...

 

發表評論
StudioEIM - 冒险者讲习所
0:00