php解决注册并发问题并提高QPS

目录

前言

前面在本地的windows通过apache的ab工具测试了600并发下“查询指定手机是否存在再提交数据”的注册功能会出现重复提交的情况,并且在注册完成时还需要对邀请人进行奖励,记录邀请记录,对该新用户自动发布动态信息,发短信或发邮件等其他业务功能。所以这里当并发时,注册功能就变得低效且容易出现问题。

先对重复提交的问题通过redis解决,再把注册储存用户基本信息以后的操作放到队列中进行异步执行,可以很好的优化注册功能,提高QPS。

一、环境要求

PHP版本 >= 5.6.0

PHP框架:Thinkphp5.1.*

消息队列:Think-queue2.0

PHP扩展:Redis

二、下载框架和消息队列中间件

  • 下载tp5.1。composer create-project topthink/think=5.1.* tp5 –prefer-dist
  • 安装think-queue。composer require topthink/think-queue 
  • php安装redis扩展和打开redis服务端和客户端。

三、解决注册重复提交

配置文件中cache设置为redis驱动,并新建控制器因为cache相关命名空间。

use think\\Exception;
use think\\facade\\Cache;
use think\\facade\\Env;
use think\\Queue;
  • 使用无序集合存手机号,通过判断当前手机号是否是在指定键里为成员(如果注册存入数据库失败,通过sRem删除该成员),然后再通过查询数据库判断是否存在。
private $cache;
private  $handler;
// 实例化redis
public function __construct() {
    $this->cache = Cache::init();
    $this->handler = $this->cache->handler();
}
// 判断手机号是否在集合中
$is_existe = $this->handler->sIsMember(\"register:mobile\",$mobile);
if(!$is_existe) {
   $this->handler->sAdd(\"register:mobile\",$mobile);
}else {
   //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---手机号已存在\');
   var_dump(\'手机号已存在\');    // 用户已存在
   die;
}
// 查询手机号码是否已注册
$user = db(\'user\')->field(\'mobile\')->where(\'mobile\', $mobile)->find();
if ($user) {
    //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---手机号注册了\');
    var_dump(\'手机号已注册\');    // 用户已存在
    die;
}

四、消息队列分解注册功能

  • 配置消息队列,后面以redis驱动为例。
<?php
return [
    \'connector\'  => \'Redis\',            // Redis 驱动
    \'expire\'     => 60,                // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    \'default\'    => \'default\',        // 默认的队列名称
    \'host\'       => \'127.0.0.1\',        // redis 主机ip
    \'port\'       => 6379,            // redis 端口
    \'password\'   => \'\',                // redis 密码
    \'select\'     => 0,                // 使用哪一个 db,默认为 db0
    \'timeout\'    => 0,                // redis连接的超时时间
    \'persistent\' => false,            // 是否是长连接
    //    \'connector\' => \'Database\',   // 数据库驱动
    //    \'expire\'    => 60,           // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    //    \'default\'   => \'default\',    // 默认的队列名称
    //    \'table\'     => \'jobs\',       // 存储消息的表名,不带前缀
    //    \'dsn\'       => [],
    //    \'connector\'   => \'Topthink\',    // ThinkPHP内部的队列通知服务平台 ,本文不作介绍
    //    \'token\'       => \'\',
    //    \'project_id\'  => \'\',
    //    \'protocol\'    => \'https\',
    //    \'host\'        => \'qns.topthink.com\',
    //    \'port\'        => 443,
    //    \'api_version\' => 1,
    //    \'max_retries\' => 3,
    //    \'default\'     => \'default\',
//        \'connector\'   => \'Sync\',        // Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行
];
  • 完成添加新用户后将指定数据加入消息队列。
<?php
namespace app\\index\\controller;
use think\\Db;
use think\\Validate;
use think\\Exception;
use think\\facade\\Cache;
use think\\facade\\Env;
use think\\Queue;
use think\\Log;
class Index
{    
    private $cache;
    private  $handler;
    public function __construct() {
        $this->cache = Cache::init();
        $this->handler = $this->cache->handler();
    }
    public function index()
    {
        $data = input(\'post.\');
        unset($data[\'balance\']);
        unset($data[\'credit\']);
        // $blacklist = [
        //     \"18124198164\",\"13401363108\",\"17688552009\",\"15089352898\",\"13602940094\",\"13346643336\",\"13181351655\",\"18301123028\",\"13598020751\",\"13014568187\",
        //     \"13428733909\",\"17337991130\",\"13275342497\"
        // ];
        $rule = [
            \'mobile\' => \'require|number|length:11\',
            \'password\' => \'require|length:6,32\',
        ];
        $msg = [
            \'mobile.require\' => \'手机号必须\',
            \'mobile.length\' => \'手机号为11位数字\',
            \'mobile.number\' => \'手机号为11位数字\',
            \'password.require\' => \'密码必须\',
            \'password.length\' => \'密码为6-12位之间\',
        ];
        //验证数据是否合法
        $mobile = isset($data[\'mobile\']) ? $data[\'mobile\'] : \'\';
        $validate = new Validate($rule, $msg);
        $result = $validate->check($data);
        if (!$result) {
            var_dump($validate->getError());
            die;
        }
        // if(in_array($mobile,$blacklist)) {
        //     var_dump(\'该手机号已注册了\');    // 黑名单
        //     die;
        // }
        // 判断手机号是否在集合中
        $is_existe = $this->handler->sIsMember(\"register:mobile\",$mobile);
        if(!$is_existe) {
            $this->handler->sAdd(\"register:mobile\",$mobile);
        }else {
            //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---手机号已存在\');
            var_dump(\'手机号已存在\');    // 用户已存在
            die;
        }
        // 查询手机号码是否已注册
        $user = db(\'user\')->field(\'mobile\')->where(\'mobile\', $mobile)->find();
        if ($user) {
            //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---手机号注册了\');
            var_dump(\'手机号已注册\');    // 用户已存在
            die;
        }       
        // 用户不存在注册
        // $data[\'id\']          = getNewUserid();
        $data[\'no\'] = date(\"Ymdhis\").rand(100, 999);
        $data[\'avatar\'] = \'https://rumcdn-1255484416.cos.ap-chengdu.myqcloud.com/img/d_h.png\';
        $data[\'password\'] = md5($data[\'password\']);
        $randomNickname = date(\"Ymdhis\").rand(100, 999);         
        $data[\'nickname\'] = \'rm_\' . $randomNickname;
        $data[\'create_time\'] = time();
        $data[\'type\'] = 1;
        /***是否存在邀请人的跑步钱进号***/
        if(isset($data[\'pbqj_no\']) && !empty($data[\'pbqj_no\'])) {
            $inviter = db(\'user\')->field(\'id\')->where([\"no\"=>$data[\'pbqj_no\']])->find();
            if($inviter) {
                $data[\'inviter_id\'] = $inviter[\'id\'];
            }
        }
        /***是否存在邀请人的跑步钱进号***/
        unset($data[\'pbqj_no\']);
        $userid = db(\'user\')->insertGetId($data);
        if ($userid) {
        /******************加入消息队列异步处理后续操作*******************/
            // 1.当前任务将由哪个类来负责处理。 
            // 当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
            $jobHandlerClassName  = \'app\\index\\job\\JobUser\'; 
            // 2.当前任务归属的队列名称,如果为新队列,会自动创建
            $jobQueueName         = \"userJobQueue\"; 
            // 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串
            // ( jobData 为对象时,需要在先在此处手动序列化,否则只存储其public属性的键值对)
            //$jobData              = [\'ts\' => time(), \'bizId\' => uniqid() , \'a\' => 1];
            $jobData              = [\'userid\'=>$userid,\'time\'=>time(),\'mobile\'=>$mobile,\'inviterid\'=>(isset($data[\'inviter_id\']) ? $data[\'inviter_id\'] : 0)];
            // 4.将该任务推送到消息队列,等待对应的消费者去执行
            $isPushed = Queue::push($jobHandlerClassName , $jobData , $jobQueueName);    
            // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
            if($isPushed !== false) { 
                var_dump(\'加入队列成功\');
                die;
                //Log::write(\'-----------加入消息队列成功-----------\');
                //echo date(\'Y-m-d H:i:s\') . \" a new Hello Job is Pushed to the MQ\".\"
\";
            }else{
                var_dump(\'加入消息队列\');
                die;
                //Log::write(\'-----------加入消息队列失败-----------\');
                //echo \'Oops, something went wrong.\';
            }
        /******************加入消息队列异步处理后续操作*******************/
            $res[\'id\'] = $userid;
            $res[\'no\'] = $data[\'no\'];
            // // token处理类
            // $accessToken = new AccessToken();
            // $accessToken = $accessToken->getToken($userid);
            // if (empty($accessToken)) {
            //     //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---秘钥生成失败\');
            //     var_dump(\'秘钥生成失败\');
            // } else {
            //     $res[\'user_token\'] = $accessToken;
            // }
            // if (method_exists(\\chat\\User::class, \'getToken\')) {
            //     $chat_token = \\chat\\User::getToken($res[\'id\'], $data[\'nickname\'], $data[\'avatar\']);
            //     if (!$chat_token) {
            //         //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---聊天秘钥生成失败\');
            //         var_dump(\'聊天秘钥生成失败\');
            //     } else {
            //         $res[\'chat_token\'] = $chat_token;
            //     }
            // } else {
            //     $res[\'chat_token\'] = \'\';
            // }
            //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---注册成功\');
            var_dump($res);
            die;
        } else {
            //Log::write(\'---压力测试\'.date(\"Y-m-d h:i:s\").\'---数据库错误\');
            $this->handler->sRem(\"register:mobile\",$mobile);
            var_dump(\'数据库错误\');
            die;
        }
    }
    public function hello($name = \'ThinkPHP5\')
    {
        return \'hello,\' . $name;
    }
}

创建消费者(job),对执行队列中的任务。

(1). 在同一模块下新建job文件夹和一个执行类(JobUser), 需要对应生产者中jobHandlerClassName。

(2). 前面执行完队列加入成功后,可以本地使用redis客户端通过lrange queues:userJobQueue 0 -1 查看队列成员

(queues:userJobQueue中,userJobQueue是自己在加入队列前自己起的队列名称,与queues: 拼接就是redis的list的键名,所以可以直接查看 )。

php解决注册并发问题并提高QPS

(3).队列中的data就是自己传递的数据,后面需要在消费者中通过该数据进行注册功能后的业务操作: 送奖励,存储邀请记录,发动态,发短信,发邮件等等。

<?php
namespace app\\index\\job;
use think\\queue\\Job;
use think\\Db;
use think\\Exception;
use think\\facade\\Cache;
use think\\facade\\Env;
class JobUser {
    private  $cache;
    private  $handler;
    public  function  __construct()
    {
        $this->cache = Cache::init();
        $this->handler = $this->cache->handler();
    }
    /**
     * fire方法是消息队列默认调用的方法
     * @param Job            $job      当前的任务对象
     * @param array|mixed    $data     发布任务时自定义的数据
     */
    public function fire(Job $job,$data) {
        $job->delete();
        //print(\"hahah\\n\");
        // print(\"<info>The user already exists \".\"</info>\\n\");
        //     exit();
        if(empty($data) || empty($data[\'userid\']) || empty($data[\'mobile\'])) {
            $job->delete();
            print(\"canshu buzu\\n\");
            return;
        }
        // 如有必要,可以根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行.
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!$isJobStillNeedToBeDone) {
            print(\"hahah\\n\");
            $job->delete();
            return;
        }
        $isJobDone = $this->doHelloJob($data);
        if ($isJobDone) {
            //如果任务执行成功, 记得删除任务
            $job->delete();
            print(\"<info>Hello Job has been done and deleted\".\"</info>\\n\");
        }else{
            if ($job->attempts() > 3) {
                //通过这个方法可以检查这个任务已经重试了几次了
                print(\"<warn>Hello Job has been retried more than 3 times!\".\"</warn>\\n\");
                //$job->delete();
                // 也可以重新发布这个任务
                //print(\"<info>Hello Job will be availabe again after 2s.\".\"</info>\\n\");
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
    }
    /**
     * 有些消息在到达消费者时,可能已经不再需要执行了
     * @param array|mixed    $data     发布任务时自定义的数据
     * @return boolean                 任务执行的结果
     */
    private function checkDatabaseToSeeIfJobNeedToBeDone($data) {
        // 判断手机缓存集合中是否存在
        // $is_existe = $this->handler->sIsMember(\"register:mobile\",$data[\'mobile\']);
        // if($is_existe) {
        //     return false;  
        // } 
        // // 查询当前用户是否在数据库中存在
        // $userinfo = Db::name(\'user\')->field(\'id\')->where(\'id\',$data[\'userid\'])->find();
        // if($userinfo) {
        //     return false;  
        // } 
        return true;
    }
    /**
     * 根据消息中的数据进行实际的业务处理
     * @param array|mixed    $data     发布任务时自定义的数据
     * @return boolean                 任务执行的结果
    */
    private function doHelloJob($data) {
        try{
            if(isset($data[\'inviterid\']) && !empty($data[\'inviterid\'])) {
                // 添加邀请记录
                $res_record = Db::name(\'user_inviter\')
                    ->insert([
                        \'inviterid\'   => $data[\'inviterid\'],
                        \'userid\'      => $data[\'userid\'],
                        \'code\'        => $data[\'inviterid\'] . \'T\' . $data[\'userid\'],
                        \'create_time\' => $data[\'time\'],
                ]);
                // 给邀请人赠送300步币
                Db::name(\'user_credit\')
                    ->insert([
                        \'userid\'      => $data[\'inviterid\'],
                        \'type\'        => 1,
                        \'credit\'      => 300,
                        \'source\'      => $res_record,
                        \'create_time\' => $data[\'time\']
                ]);
                // 更新邀请人步币(用户表)
                Db::name(\'user\')->where(\'id\', $data[\'inviterid\'])->setInc(\'credit\', 300);              
            }
            {   // 注册成功发表动态
                $dynamic_data[\'userid\'] = $data[\'userid\'];
                $dynamic_data[\'dynamic\'] = base64_encode(\'号外!号外!我加入跑步钱进了,大家一起走路领红包吧!\');
                $dynamic_data[\'images\'][] = \'https://rumcdn-1255484416.cos.ap-chengdu.myqcloud.com/img/d_d.png\';
                $dynamic_data[\'images\'] = serialize($dynamic_data[\'images\']);
                $dynamic_data[\'create_time\'] = $data[\'time\'];
                $result = Db::name(\'dynamic\')->insert($dynamic_data);
            }
        }catch(\\Exception $e) {
            Log::write(\'---执行消息队列出错---\'.$e->getMessage());
            return false;
        }
        return true;
        // 根据消息中的数据进行实际的业务处理...
        //var_dump($data);
//        print(\"<info>Hello Job Started. job Data is: \".var_export($data,true).\"</info> \\n\");
//        print(\"<info>Hello Job is Fired at \" . date(\'Y-m-d H:i:s\') .\"</info> \\n\");
//        print(\"<info>Hello Job is Done!\".\"</info> \\n\");
        //return true;
    }
    /**
     * 该方法用于接收任务执行失败的通知,你可以发送邮件给相应的负责人员
     * @param $jobData  string|array|...      //发布任务时传递的 jobData 数据
    */
    public function failed($jobData) {
        //send_mail_to_somebody() ;
        print(\"Warning: Job failed after max retries. job data is :\".var_export($jobData,true).\"\\n\");
    }
}

(4). 设置任务执行失败后的处理,比如记录日志或发邮件给开发者。

a. 在tags.php中配置失败后执行了类。

<?php
// 应用行为扩展定义文件
return [
    // 应用初始化
    \'app_init\'     => [],
    // 应用开始
    \'app_begin\'    => [],
    // 模块初始化
    \'module_init\'  => [],
    // 操作开始执行
    \'action_begin\' => [],
    // 视图内容过滤
    \'view_filter\'  => [],
    // 日志写入
    \'log_write\'    => [],
    // 应用结束
    \'app_end\'      => [],
    \'queue_failed\' => [
        // 数组形式,[ \'ClassName\' , \'methodName\']
        [\'application\\\\behavior\\\\MyQueueFailedLogger\', \'logAllFailedQueues\']
        // 字符串(静态方法),\'StaicClassName::methodName\'
        // \'MyQueueFailedLogger::logAllFailedQueues\'
        // 字符串(对象方法),\'ClassName\',此时需在对应的ClassName类中添加一个名为 queueFailed 的方法
        // \'application\\\\behavior\\\\MyQueueFailedLogger\'
        // 闭包形式
        /*
        function( &$jobObject , $extra){
            // var_dump($jobObject);
            return true;
        }
        */
    ],
];

b. 在application目录下创建任务错误执行后的处理脚本,根据业务需求自定。

<?php
namespace app\\behavior;
use think\\Db;
class MyQueueFailedLogger
{
    const should_run_hook_callback = true;
    /**
     * @param $jobObject   \\think\\queue\\Job   //任务对象,保存了该任务的执行情况和业务数据
     * @return bool     true                  //是否需要删除任务并触发其failed() 方法
    */
    public function logAllFailedQueues(&$jobObject) {
        $failedJobLog = [
            \'jobHandlerClassName\'   => $jobObject->getName(), // \'application\\index\\job\\Hello\'
            \'queueName\' => $jobObject->getQueue(),               // \'helloJobQueue\'
            \'jobData\'   => $jobObject->getRawBody()[\'data\'],  // \'{\'a\': 1 }\'
            \'attempts\'  => $jobObject->attempts(),            // 3
        ];
        var_export(json_encode($failedJobLog,true));
        $data = [
            \"content\" => json_encode($failedJobLog,true),
            \"create_time\" => time(),
        ];
        Db::name(\'ztest\')->insertGetId($data);
        // $jobObject->release();     //重发任务
        //$jobObject->delete();         //删除任务
        //$jobObject->failed();      //通知消费者类任务执行失败
        return self::should_run_hook_callback;
    }
}

五、通过命令运行消息队列,以下以windows举栗

  • cmd进入当前项目, 然后输入 "php think queue:listen –queue userJobQueue"   (userJobQueue是自己的队列名)。
  • 也可以在项目的根目录创建bat文件,文件写入"php think queue:listen –queue userJobQueue",保存只需双击就可以执行。 

六、测试

结果使用了消息队列后,同样610的并发,使用时间就缩短了

php解决注册并发问题并提高QPS

php解决注册并发问题并提高QPS

以上就是php解决注册并发问题并提高QPS的详细内容,更多关于php注册并发提高QPS的资料请关注其它相关文章!

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容