PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:
redis
rabbitmq
kafka
redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:
// 入列
$redis->lpush($key, $data);
// 出列
$data = $redis->rpop($key);
// 阻塞出列
$data = $redis->brpop($key, 10);
邮件发送库选型
swiftmailer 是安装在 MixPHP 项目中,在项目根目录中执行以下命令安装:
composer require swiftmailer/swiftmailer
redis 会安装一个类库来使用
$redis = new \Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
throw new \Exception('Redis Connect Failure');
}
$redis->auth('');
$redis->select(0);
// 投递任务
$data = [
'to' => ['***@qq.com' => 'A name'],
'body' => 'Here is the message itself',
'subject' => 'The title content',
];
$redis->lpush('queue:email', serialize($data));
通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有执行。
我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。
TaskExecutor 的 MODE_PUSH 模式有二种进程:
左进程:负责从消息队列取出任务数据,投放给中进程。
中进程:负责执行邮件发送任务。
<?php
namespace apps\daemon\commands;
use mix\console\ExitCode;
use mix\facades\Input;
use mix\facades\Redis;
use mix\task\CenterProcess;
use mix\task\LeftProcess;
use mix\task\TaskExecutor;
/**
* 推送模式范例
* @author 刘健 <coder.liu@qq.com>
*/
class PushCommand extends BaseCommand
{
// 配置信息
const HOST = 'smtpdm.aliyun.com';
const PORT = 465;
const SECURITY = 'ssl';
const USERNAME = '****@email.***.com';
const PASSWORD = '****';
// 初始化事件
public function onInitialize()
{
parent::onInitialize(); // TODO: Change the autogenerated stub
// 获取程序名称
$this->programName = Input::getCommandName();
// 设置pidfile
$this->pidFile = "/var/run/{$this->programName}.pid";
}
/**
* 获取服务
* @return TaskExecutor
*/
public function getTaskService()
{
return create_object(
[
// 类路径
'class' => 'mix\task\TaskExecutor',
// 服务名称
'name' => "mix-daemon: {$this->programName}",
// 执行类型
'type' => \mix\task\TaskExecutor::TYPE_DAEMON,
// 执行模式
'mode' => \mix\task\TaskExecutor::MODE_PUSH,
// 左进程数
'leftProcess' => 1,
// 中进程数
'centerProcess' => 5,
// 任务超时时间 (秒)
'timeout' => 5,
]
);
}
// 启动
public function actionStart()
{
// 预处理
if (!parent::actionStart()) {
return ExitCode::UNSPECIFIED_ERROR;
}
// 启动服务
$service = $this->getTaskService();
$service->on('LeftStart', [$this, 'onLeftStart']);
$service->on('CenterStart', [$this, 'onCenterStart']);
$service->start();
// 返回退出码
return ExitCode::OK;
}
// 左进程启动事件回调函数
public function onLeftStart(LeftProcess $worker)
{
try {
// 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
$queueModel = Redis::getInstance();
// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
for ($j = 0; $j < 16000; $j++) {
// 从消息队列中间件阻塞获取一条消息
$data = $queueModel->brpop('queue:email', 10);
if (empty($data)) {
continue;
}
list(, $data) = $data;
// 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
$worker->push($data, false);
}
} catch (\Exception $e) {
// 休息一会,避免 CPU 出现 100%
sleep(1);
// 抛出错误
throw $e;
}
}
// 中进程启动事件回调函数
public function onCenterStart(CenterProcess $worker)
{
// 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
for ($j = 0; $j < 16000; $j++) {
// 从进程消息队列中抢占一条消息
$data = $worker->pop();
if (empty($data)) {
continue;
}
// 处理消息
try {
// 处理消息,比如:发送短信、发送邮件、微信推送
var_dump($data);
$ret = self::sendEmail($data);
var_dump($ret);
} catch (\Exception $e) {
// 回退数据到消息队列
$worker->rollback($data);
// 休息一会,避免 CPU 出现 100%
sleep(1);
// 抛出错误
throw $e;
}
}
}
// 发送邮件
public static function sendEmail($data)
{
// Create the Transport
$transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
->setUsername(self::USERNAME)
->setPassword(self::PASSWORD);
// Create the Mailer using your created Transport
$mailer = new \Swift_Mailer($transport);
// Create a message
$message = (new \Swift_Message($data['subject']))
->setFrom([self::USERNAME => '**网'])
->setTo($data['to'])
->setBody($data['body']);
// Send the message
$result = $mailer->send($message);
return $result;
}
}
在 shell 中启动 push 常驻程序。
./mix-daemon push start
mix-daemon 'push' start successed.
http://www.xinkang139.cn