Mix PHP V2 实例: AliCloud 短信协程池异步发送守护程序 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
onanying
V2EX    PHP

Mix PHP V2 实例: AliCloud 短信协程池异步发送守护程序

  •  
  •   onanying 2019 年 5 月 24 日 3332 次点击
    这是一个创建于 2440 天前的主题,其中的信息可能已经有所发展或是发生改变。

    前些时间我们发布了 Mix PHP V2 实例:协程池异步邮件发送守护程序 范例,这一次我们提供一个使用大厂 SDK 通过 Swoole Hook 协程化来并行执行短信发送任务,本文是一个代码简单、IO 性能极强的范例。

    请先升级到 mix-framework >= v2.0.5

    本范例依然使用消息队列的方式接收短信发送任务,消息中间件使用:

    • redis

    生产者

    通常框架中使用 Redis 会安装一个类库来使用,本例使用原生代码,便于理解。

    // 连接 $redis = new \Redis(); if (!$redis->connect('127.0.0.1', 6379)) { throw new \Exception('Redis connect failed.'); } $redis->auth(''); $redis->select(0); // 投递任务 for($i = 0; $i < 3; $i++){ $data = [ 'phone' => '***', 'templateCode' => 'SMS_***', 'templateParam' => ['code' => 123456], ]; $redis->lpush('queue:sms', serialize($data)); } 

    消费者

    使用的是 ali 云的短信服务,查看官方 PHP SDK 文档 ,使用的库为:

    composer require alibabacloud/client 

    通过查看该库的 composer 依赖文件,我们得知该库基于 guzzlehttp 开发,因为 Mix PHP 提供了无需修改代码就可 Hook Guzzle 库可在协程中使用的工具 Mix PHP V2 生态:让 Guzzle 支持 Swoole 的 Hook 协程,所以能基本确定该库可在 Swoole 协程中使用。

    首先我们安装 https://github.com/mix-php/guzzle-hookalibabacloud/client 可在协程中使用:

    composer require mix/guzzle-hook 

    然后在项目的 composer.json 文件中增加 extra 配置项,如下:

    "extra": { "include_files": [ "vendor/mix/guzzle-hook/src/functions_include.php" ] } 

    更新自动加载:

    composer dump-autoload 

    下面我们采用 Mix PHP V2 的守护程序、协程池来完成一个超高性能的短信发送程序。

    首先我们在配置 applications/console/config/main.php 中注册一个命令:

    // 命令 'commands' => [ 'smser' => [ 'Smser', 'description' => "SMS send daemon demo.", 'options' => [ [['d', 'daemon'], 'description' => 'Run in the background'], ], ], ], 

    注册的命令中指定的 Smser 命令类,接下来我们编写一个 SmserCommand 类:

    applications/console/src/Commands/SmserCommand.php 
    <?php namespace Console\Commands; use Console\Libraries\SmserWorker; use Mix\Concurrent\CoroutinePool\Dispatcher; use Mix\Console\CommandLine\Flag; use Mix\Core\Coroutine; use Mix\Core\Coroutine\Channel; use Mix\Core\Event; use Mix\Helper\ProcessHelper; use AlibabaCloud\Client\AlibabaCloud; /** * Class SmserCommand * @package Daemon\Commands * @author liu,jian <[email protected]> */ class SmserCommand { const ACCESS_KEY = '***'; const ACCESS_SECRET = '***'; /** * 退出 * @var bool */ public $quit = false; /** * 主函数 */ public function main() { // 守护处理 $daemon = Flag::bool(['d', 'daemon'], false); if ($daemon) { ProcessHelper::daemon(); } // 捕获信号 ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], function ($signal) { $this->quit = true; ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], null); }); // 设置 ali 云全局参数 AlibabaCloud::accessKeyClient(static::ACCESS_KEY, static::ACCESS_SECRET)->regionId('cn-hangzhou')->asDefaultClient(); // 手动关闭 Swoole 文件 Hook,因为 ali 云依赖的 uuid 库有文件 hook 协程兼容问题,Swoole 4.4 已经适配该问题 Coroutine::enableHook(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_FILE); // 协程池执行任务 xgo(function () { $maxWorkers = 20; $maxQueue = 20; $jobQueue = new Channel($maxQueue); $dispatch = new Dispatcher([ 'jobQueue' => $jobQueue, 'maxWorkers' => $maxWorkers, ]); $dispatch->start(SmserWorker::class); // 投放任务 $redis = app()->redisPool->getConnection(); while (true) { if ($this->quit) { $dispatch->stop(); return; } try { $data = $redis->brPop(['queue:sms'], 3); } catch (\Throwable $e) { $dispatch->stop(); return; } if (!$data) { continue; } $data = array_pop($data); // brPop 命令最后一个键才是值 $jobQueue->push($data); } }); // 等待事件 Event::wait(); } } 

    $data = $redis->brPop(['queue:sms'], 3); 外部的异常捕获可得知,当 Redis 连接出错时,比如 Redis 重启、连接异常时协程池会安全退出,也就是说当进程异常退出后用户需使用 supervisorpm2 等工具重启守护进程。

    上面是一个 Mix PHP 协程池的使用代码,基本可以直接复制使用,框架默认包含了协程池的 Demo,本次实例只是修改了协程池的 Worker,本命令主要是完成从 Redis 队列中获取消息然后 push 到 jobQueue 中,jobQueue 中的数据会被 20 个 Worker 实例中某一个抢占后并行执行,本例的发送代码逻辑就在 SmserWorker 类中:

    applications/console/src/Libraries/SmserWorker.php 
    <?php namespace Console\Libraries; use Mix\Concurrent\CoroutinePool\AbstractWorker; use Mix\Concurrent\CoroutinePool\WorkerInterface; /** * Class SmserWorker * @package Daemon\Libraries * @author liu,jian <[email protected]> */ class SmserWorker extends AbstractWorker implements WorkerInterface { /** * 邮件发送器 * @var Smser */ public $smser; /** * 初始化事件 */ public function onInitialize() { parent::onInitialize(); // TODO: Change the autogenerated stub // 实例化一些需重用的对象 $this->smser = new Smser(); } /** * 处理 * @param $data */ public function handle($data) { // TODO: Implement handle() method. $data = unserialize($data); if (empty($data)) { return; } try { $result = $this->smser->send($data['phone'], $data['templateCode'], $data['templateParam']); app()->log->info("SMS sent successfully:phone {phone} templateCode {templateCode} result {result}", array_merge($data, ['result' => json_encode($result, JSON_UNESCAPED_UNICODE)])); } catch (\Throwable $e) { app()->log->error("SMS failed to send:phone {phone} templateCode {templateCode} error {error}", array_merge($data, ['error' => $e->getMessage()])); } } } 

    由以上代码可见,Worker 在初始化时,新增了一个 Smser 类的属性,当 jobQueue 消息投递过来时消息会传递到 handle 方法,在该方法中使用 Mailer 类的实例完成邮件发送任务,所以我们要编写了一个 Smser 发送程序:

    applications/console/src/Libraries/Smser.php 
    <?php namespace Console\Libraries; use AlibabaCloud\Client\AlibabaCloud; use AlibabaCloud\Client\Exception\ClientException; use AlibabaCloud\Client\Exception\ServerException; use Mix\Core\Coroutine; /** * Class Smser * @package Console\Libraries * @author liu,jian <[email protected]> */ class Smser { /** * 配置信息 */ const SIGN_NAME = '***'; /** * Smser constructor. */ public function __construct() { // 开启协程钩子 Coroutine::enableHook(); } /** * 发送 * @param $phone * @param $templateCode * @param $templateParam * @return array * @throws ClientException * @throws ServerException */ public function send($phone, $templateCode, $templateParam) { $result = AlibabaCloud::rpc() ->product('Dysmsapi') // ->scheme('https') // https | http ->version('2017-05-25') ->action('SendSms') ->method('POST') ->options([ 'query' => [ 'PhoneNumbers' => $phone, 'SignName' => static::SIGN_NAME, 'TemplateCode' => $templateCode, 'TemplateParam' => json_encode($templateParam), ], ]) ->request(); return $result->toArray(); } } 

    以上就完成了全部的代码逻辑,现在我们开始测试,先启动消费者守护程序:

    [root@localhost bin]# ./mix-console smser 

    将上文的生产者脚本命名为 push.php 然后在 CLI 中执行 (开一个新终端):

    [root@localhost bin]# php /tmp/push.php 

    消费者守护程序结果:

    [root@localhost bin]# ./mix-console smser [info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"OK","RequestId":"4071D031-6D9E-4F70-9269-6C1979080858","BizId":"939807358670612546^0","Code":"OK"} [info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"触发分钟级流控 Permits:1","RequestId":"490B73D7-317E-4362-B2DD-5E2153A7B891","Code":"isv.BUSINESS_LIMIT_CONTROL"} [info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"触发分钟级流控 Permits:1","RequestId":"1FD22EDB-BAA4-4416-8FF9-242EDCF34359","Code":"isv.BUSINESS_LIMIT_CONTROL"} 

    命令行终端打印了发送成功的日志,发送完成。

    目前尚无回复
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     3292 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 31ms UTC 12:01 PVG 20:01 LAX 04:01 JFK 07:01
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86