目录
前言
前面在本地的windows通过apache的ab工具测试了600并发下“查询指定手机是否存在再提交数据”的注册功能会出现重复提交的情况,并且在注册完成时还需要对邀请人进行奖励,记录邀请记录,对该新用户自动发布动态信息,发短信或发邮件等其他业务功能。所以这里当并发时,注册功能就变得低效且容易出现问题。
先对重复提交的问题通过redis解决,再把注册储存用户基本信息以后的操作放到队列中进行异步执行,可以很好的优化注册功能,提高QPS。
一、环境要求
PHP版本 >= 5.6.0
PHP框架:Thinkphp5.1.*
消息队列:Think-queue2.0
PHP扩展:Redis
二、下载框架和消息队列中间件
- 下载tp5.1。composer create-project topthink/think=5.1.* tp5 –prefer-dist
- 安装think-queue。composer require topthink/think-queue
- php安装redis扩展和打开redis服务端和客户端。
三、解决注册重复提交
配置文件中cache设置为redis驱动,并新建控制器因为cache相关命名空间。
use think\\Exception; use think\\facade\\Cache; use think\\facade\\Env; use think\\Queue;
- 使用无序集合存手机号,通过判断当前手机号是否是在指定键里为成员(如果注册存入数据库失败,通过sRem删除该成员),然后再通过查询数据库判断是否存在。
private $cache; private $handler; // 实例化redis public function __construct() { $this->cache = Cache::init(); $this->handler = $this->cache->handler(); } // 判断手机号是否在集合中 $is_existe = $this->handler->sIsMember(\"register:mobile\",$mobile); if(!$is_existe) { $this->handler->sAdd(\"register:mobile\",$mobile); }else { //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---手机号已存在\'); var_dump(\'手机号已存在\'); // 用户已存在 die; } // 查询手机号码是否已注册 $user = db(\'user\')->field(\'mobile\')->where(\'mobile\', $mobile)->find(); if ($user) { //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---手机号注册了\'); var_dump(\'手机号已注册\'); // 用户已存在 die; }
四、消息队列分解注册功能
- 配置消息队列,后面以redis驱动为例。
<?php return [ \'connector\' => \'Redis\', // Redis 驱动 \'expire\' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null \'default\' => \'default\', // 默认的队列名称 \'host\' => \'127.0.0.1\', // redis 主机ip \'port\' => 6379, // redis 端口 \'password\' => \'\', // redis 密码 \'select\' => 0, // 使用哪一个 db,默认为 db0 \'timeout\' => 0, // redis连接的超时时间 \'persistent\' => false, // 是否是长连接 // \'connector\' => \'Database\', // 数据库驱动 // \'expire\' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null // \'default\' => \'default\', // 默认的队列名称 // \'table\' => \'jobs\', // 存储消息的表名,不带前缀 // \'dsn\' => [], // \'connector\' => \'Topthink\', // ThinkPHP内部的队列通知服务平台 ,本文不作介绍 // \'token\' => \'\', // \'project_id\' => \'\', // \'protocol\' => \'https\', // \'host\' => \'qns.topthink.com\', // \'port\' => 443, // \'api_version\' => 1, // \'max_retries\' => 3, // \'default\' => \'default\', // \'connector\' => \'Sync\', // Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行 ];
- 完成添加新用户后将指定数据加入消息队列。
<?php namespace app\\index\\controller; use think\\Db; use think\\Validate; use think\\Exception; use think\\facade\\Cache; use think\\facade\\Env; use think\\Queue; use think\\Log; class Index { private $cache; private $handler; public function __construct() { $this->cache = Cache::init(); $this->handler = $this->cache->handler(); } public function index() { $data = input(\'post.\'); unset($data[\'balance\']); unset($data[\'credit\']); // $blacklist = [ // \"18124198164\",\"13401363108\",\"17688552009\",\"15089352898\",\"13602940094\",\"13346643336\",\"13181351655\",\"18301123028\",\"13598020751\",\"13014568187\", // \"13428733909\",\"17337991130\",\"13275342497\" // ]; $rule = [ \'mobile\' => \'require|number|length:11\', \'password\' => \'require|length:6,32\', ]; $msg = [ \'mobile.require\' => \'手机号必须\', \'mobile.length\' => \'手机号为11位数字\', \'mobile.number\' => \'手机号为11位数字\', \'password.require\' => \'密码必须\', \'password.length\' => \'密码为6-12位之间\', ]; //验证数据是否合法 $mobile = isset($data[\'mobile\']) ? $data[\'mobile\'] : \'\'; $validate = new Validate($rule, $msg); $result = $validate->check($data); if (!$result) { var_dump($validate->getError()); die; } // if(in_array($mobile,$blacklist)) { // var_dump(\'该手机号已注册了\'); // 黑名单 // die; // } // 判断手机号是否在集合中 $is_existe = $this->handler->sIsMember(\"register:mobile\",$mobile); if(!$is_existe) { $this->handler->sAdd(\"register:mobile\",$mobile); }else { //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---手机号已存在\'); var_dump(\'手机号已存在\'); // 用户已存在 die; } // 查询手机号码是否已注册 $user = db(\'user\')->field(\'mobile\')->where(\'mobile\', $mobile)->find(); if ($user) { //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---手机号注册了\'); var_dump(\'手机号已注册\'); // 用户已存在 die; } // 用户不存在注册 // $data[\'id\'] = getNewUserid(); $data[\'no\'] = date(\"Ymdhis\").rand(100, 999); $data[\'avatar\'] = \'https://rumcdn-1255484416.cos.ap-chengdu.myqcloud.com/img/d_h.png\'; $data[\'password\'] = md5($data[\'password\']); $randomNickname = date(\"Ymdhis\").rand(100, 999); $data[\'nickname\'] = \'rm_\' . $randomNickname; $data[\'create_time\'] = time(); $data[\'type\'] = 1; /***是否存在邀请人的跑步钱进号***/ if(isset($data[\'pbqj_no\']) && !empty($data[\'pbqj_no\'])) { $inviter = db(\'user\')->field(\'id\')->where([\"no\"=>$data[\'pbqj_no\']])->find(); if($inviter) { $data[\'inviter_id\'] = $inviter[\'id\']; } } /***是否存在邀请人的跑步钱进号***/ unset($data[\'pbqj_no\']); $userid = db(\'user\')->insertGetId($data); if ($userid) { /******************加入消息队列异步处理后续操作*******************/ // 1.当前任务将由哪个类来负责处理。 // 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法 $jobHandlerClassName = \'app\\index\\job\\JobUser\'; // 2.当前任务归属的队列名称,如果为新队列,会自动创建 $jobQueueName = \"userJobQueue\"; // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串 // ( jobData 为对象时,需要在先在此处手动序列化,否则只存储其public属性的键值对) //$jobData = [\'ts\' => time(), \'bizId\' => uniqid() , \'a\' => 1]; $jobData = [\'userid\'=>$userid,\'time\'=>time(),\'mobile\'=>$mobile,\'inviterid\'=>(isset($data[\'inviter_id\']) ? $data[\'inviter_id\'] : 0)]; // 4.将该任务推送到消息队列,等待对应的消费者去执行 $isPushed = Queue::push($jobHandlerClassName , $jobData , $jobQueueName); // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false if($isPushed !== false) { var_dump(\'加入队列成功\'); die; //Log::write(\'-----------加入消息队列成功-----------\'); //echo date(\'Y-m-d H:i:s\') . \" a new Hello Job is Pushed to the MQ\".\" \"; }else{ var_dump(\'加入消息队列\'); die; //Log::write(\'-----------加入消息队列失败-----------\'); //echo \'Oops, something went wrong.\'; } /******************加入消息队列异步处理后续操作*******************/ $res[\'id\'] = $userid; $res[\'no\'] = $data[\'no\']; // // token处理类 // $accessToken = new AccessToken(); // $accessToken = $accessToken->getToken($userid); // if (empty($accessToken)) { // //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---秘钥生成失败\'); // var_dump(\'秘钥生成失败\'); // } else { // $res[\'user_token\'] = $accessToken; // } // if (method_exists(\\chat\\User::class, \'getToken\')) { // $chat_token = \\chat\\User::getToken($res[\'id\'], $data[\'nickname\'], $data[\'avatar\']); // if (!$chat_token) { // //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---聊天秘钥生成失败\'); // var_dump(\'聊天秘钥生成失败\'); // } else { // $res[\'chat_token\'] = $chat_token; // } // } else { // $res[\'chat_token\'] = \'\'; // } //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---注册成功\'); var_dump($res); die; } else { //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---数据库错误\'); $this->handler->sRem(\"register:mobile\",$mobile); var_dump(\'数据库错误\'); die; } } public function hello($name = \'ThinkPHP5\') { return \'hello,\' . $name; } }
创建消费者(job),对执行队列中的任务。
(1). 在同一模块下新建job文件夹和一个执行类(JobUser), 需要对应生产者中jobHandlerClassName。
(2). 前面执行完队列加入成功后,可以本地使用redis客户端通过lrange queues:userJobQueue 0 -1 查看队列成员
(queues:userJobQueue中,userJobQueue是自己在加入队列前自己起的队列名称,与queues: 拼接就是redis的list的键名,所以可以直接查看 )。
(3).队列中的data就是自己传递的数据,后面需要在消费者中通过该数据进行注册功能后的业务操作: 送奖励,存储邀请记录,发动态,发短信,发邮件等等。
<?php namespace app\\index\\job; use think\\queue\\Job; use think\\Db; use think\\Exception; use think\\facade\\Cache; use think\\facade\\Env; class JobUser { private $cache; private $handler; public function __construct() { $this->cache = Cache::init(); $this->handler = $this->cache->handler(); } /** * fire方法是消息队列默认调用的方法 * @param Job $job 当前的任务对象 * @param array|mixed $data 发布任务时自定义的数据 */ public function fire(Job $job,$data) { $job->delete(); //print(\"hahah\\n\"); // print(\"<info>The user already exists \".\"</info>\\n\"); // exit(); if(empty($data) || empty($data[\'userid\']) || empty($data[\'mobile\'])) { $job->delete(); print(\"canshu buzu\\n\"); return; } // 如有必要,可以根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行. $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); if(!$isJobStillNeedToBeDone) { print(\"hahah\\n\"); $job->delete(); return; } $isJobDone = $this->doHelloJob($data); if ($isJobDone) { //如果任务执行成功, 记得删除任务 $job->delete(); print(\"<info>Hello Job has been done and deleted\".\"</info>\\n\"); }else{ if ($job->attempts() > 3) { //通过这个方法可以检查这个任务已经重试了几次了 print(\"<warn>Hello Job has been retried more than 3 times!\".\"</warn>\\n\"); //$job->delete(); // 也可以重新发布这个任务 //print(\"<info>Hello Job will be availabe again after 2s.\".\"</info>\\n\"); //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行 } } } /** * 有些消息在到达消费者时,可能已经不再需要执行了 * @param array|mixed $data 发布任务时自定义的数据 * @return boolean 任务执行的结果 */ private function checkDatabaseToSeeIfJobNeedToBeDone($data) { // 判断手机缓存集合中是否存在 // $is_existe = $this->handler->sIsMember(\"register:mobile\",$data[\'mobile\']); // if($is_existe) { // return false; // } // // 查询当前用户是否在数据库中存在 // $userinfo = Db::name(\'user\')->field(\'id\')->where(\'id\',$data[\'userid\'])->find(); // if($userinfo) { // return false; // } return true; } /** * 根据消息中的数据进行实际的业务处理 * @param array|mixed $data 发布任务时自定义的数据 * @return boolean 任务执行的结果 */ private function doHelloJob($data) { try{ if(isset($data[\'inviterid\']) && !empty($data[\'inviterid\'])) { // 添加邀请记录 $res_record = Db::name(\'user_inviter\') ->insert([ \'inviterid\' => $data[\'inviterid\'], \'userid\' => $data[\'userid\'], \'code\' => $data[\'inviterid\'] . \'T\' . $data[\'userid\'], \'create_time\' => $data[\'time\'], ]); // 给邀请人赠送300步币 Db::name(\'user_credit\') ->insert([ \'userid\' => $data[\'inviterid\'], \'type\' => 1, \'credit\' => 300, \'source\' => $res_record, \'create_time\' => $data[\'time\'] ]); // 更新邀请人步币(用户表) Db::name(\'user\')->where(\'id\', $data[\'inviterid\'])->setInc(\'credit\', 300); } { // 注册成功发表动态 $dynamic_data[\'userid\'] = $data[\'userid\']; $dynamic_data[\'dynamic\'] = base64_encode(\'号外!号外!我加入跑步钱进了,大家一起走路领红包吧!\'); $dynamic_data[\'images\'][] = \'https://rumcdn-1255484416.cos.ap-chengdu.myqcloud.com/img/d_d.png\'; $dynamic_data[\'images\'] = serialize($dynamic_data[\'images\']); $dynamic_data[\'create_time\'] = $data[\'time\']; $result = Db::name(\'dynamic\')->insert($dynamic_data); } }catch(\\Exception $e) { Log::write(\'---执行消息队列出错---\'.$e->getMessage()); return false; } return true; // 根据消息中的数据进行实际的业务处理... //var_dump($data); // print(\"<info>Hello Job Started. job Data is: \".var_export($data,true).\"</info> \\n\"); // print(\"<info>Hello Job is Fired at \" . date(\'Y-m-d H:i:s\') .\"</info> \\n\"); // print(\"<info>Hello Job is Done!\".\"</info> \\n\"); //return true; } /** * 该方法用于接收任务执行失败的通知,你可以发送邮件给相应的负责人员 * @param $jobData string|array|... //发布任务时传递的 jobData 数据 */ public function failed($jobData) { //send_mail_to_somebody() ; print(\"Warning: Job failed after max retries. job data is :\".var_export($jobData,true).\"\\n\"); } }
(4). 设置任务执行失败后的处理,比如记录日志或发邮件给开发者。
a. 在tags.php中配置失败后执行了类。
<?php // 应用行为扩展定义文件 return [ // 应用初始化 \'app_init\' => [], // 应用开始 \'app_begin\' => [], // 模块初始化 \'module_init\' => [], // 操作开始执行 \'action_begin\' => [], // 视图内容过滤 \'view_filter\' => [], // 日志写入 \'log_write\' => [], // 应用结束 \'app_end\' => [], \'queue_failed\' => [ // 数组形式,[ \'ClassName\' , \'methodName\'] [\'application\\\\behavior\\\\MyQueueFailedLogger\', \'logAllFailedQueues\'] // 字符串(静态方法),\'StaicClassName::methodName\' // \'MyQueueFailedLogger::logAllFailedQueues\' // 字符串(对象方法),\'ClassName\',此时需在对应的ClassName类中添加一个名为 queueFailed 的方法 // \'application\\\\behavior\\\\MyQueueFailedLogger\' // 闭包形式 /* function( &$jobObject , $extra){ // var_dump($jobObject); return true; } */ ], ];
b. 在application目录下创建任务错误执行后的处理脚本,根据业务需求自定。
<?php namespace app\\behavior; use think\\Db; class MyQueueFailedLogger { const should_run_hook_callback = true; /** * @param $jobObject \\think\\queue\\Job //任务对象,保存了该任务的执行情况和业务数据 * @return bool true //是否需要删除任务并触发其failed() 方法 */ public function logAllFailedQueues(&$jobObject) { $failedJobLog = [ \'jobHandlerClassName\' => $jobObject->getName(), // \'application\\index\\job\\Hello\' \'queueName\' => $jobObject->getQueue(), // \'helloJobQueue\' \'jobData\' => $jobObject->getRawBody()[\'data\'], // \'{\'a\': 1 }\' \'attempts\' => $jobObject->attempts(), // 3 ]; var_export(json_encode($failedJobLog,true)); $data = [ \"content\" => json_encode($failedJobLog,true), \"create_time\" => time(), ]; Db::name(\'ztest\')->insertGetId($data); // $jobObject->release(); //重发任务 //$jobObject->delete(); //删除任务 //$jobObject->failed(); //通知消费者类任务执行失败 return self::should_run_hook_callback; } }
五、通过命令运行消息队列,以下以windows举栗
- cmd进入当前项目, 然后输入 "php think queue:listen –queue userJobQueue" (userJobQueue是自己的队列名)。
- 也可以在项目的根目录创建bat文件,文件写入"php think queue:listen –queue userJobQueue",保存只需双击就可以执行。
六、测试
结果使用了消息队列后,同样610的并发,使用时间就缩短了
以上就是php解决注册并发问题并提高QPS的详细内容,更多关于php注册并发提高QPS的资料请关注其它相关文章!
暂无评论内容