php使用swoole实现TCP服务

这里以在Yii框架下示例

一:swoole配置TCP

\'swoole\' => [
    // 日志文件路径
    \'log_file\' => \'@console/log/swoole.log\',
    // 设置swoole_server错误日志打印的等级,范围是0-5。低于log_level设置的日志信息不会抛出
    \'log_level\' => 1,
    // 进程的PID存储文件
    \'pid_file\' => \'@console/log/swoole.server.pid\',

    // HTTP协议配置
    \'http\' => [
        \'host\' => \'0.0.0.0\',
        \'port\' => \'8889\',

        // 异步任务的工作进程数量
        \'task_worker_num\' => 4,
    ],
    // TCP协议配置
    \'tcp\' => [
        \'host\' => \'0.0.0.0\',
        \'port\' => \'14000\',

        // 异步任务的工作进程数量
        \'task_worker_num\' => 4,

        // 启用TCP-Keepalive死连接检测
        \'open_tcp_keepalive\' => 1,
        // 单位秒,连接在n秒内没有数据请求,将开始对此连接进行探测
        \'tcp_keepidle\' => 5 * 60,
        // 探测的次数,超过次数后将close此连接
        \'tcp_keepcount\' => 3,
        // 探测的间隔时间,单位秒
        \'tcp_keepinterval\' => 60,

        // 心跳检测,此选项表示每隔多久轮循一次,单位为秒
        \'heartbeat_check_interval\' => 2 * 60,
        // 心跳检测,连接最大允许空闲的时间
        \'heartbeat_idle_time\' => 5 * 60,
    ]
],

二:swoole实现TCP服务基类

<?php
/**
 * @link http://www.u-bo.com
 * @copyright 南京友博网络科技有限公司
 * @license http://www.u-bo.com/license/
 */

namespace console\\swoole;

use Yii;
use yii\\helpers\\Console;
use yii\\helpers\\ArrayHelper;

/*
 * Swoole Server基类
 *
 * @author wangjian
 * @since 0.1
 */
abstract class BaseServer
{
    /**
     * @var Swoole\\Server
     */
    public $swoole;
    /**
     * @var boolean DEBUG
     */
    public $debug = false;

    /**
     * __construct
     */
    public function __construct($httpConfig, $tcpConfig, $config = [])
    {
        $httpHost = ArrayHelper::remove($httpConfig, \'host\');
        $httpPort = ArrayHelper::remove($httpConfig, \'port\');
        $this->swoole = new \\swoole_http_server($httpHost, $httpPort);
        $this->swoole->set(ArrayHelper::merge($config, $httpConfig));

        $this->swoole->on(\'start\', [$this, \'onStart\']);
        $this->swoole->on(\'request\', [$this, \'onRequest\']);
        $this->swoole->on(\'WorkerStart\', [$this, \'onWorkerStart\']);
        $this->swoole->on(\'WorkerStop\', [$this, \'onWorkerStop\']);
        $this->swoole->on(\'task\', [$this, \'onTask\']);
        $this->swoole->on(\'finish\', [$this, \'onTaskFinish\']);

        $this->swoole->on(\'shutdown\', [$this, \'onShutdown\']);

        $tcpHost = ArrayHelper::remove($tcpConfig, \'host\');
        $tcpPort = ArrayHelper::remove($tcpConfig, \'port\');
        $tcpServer = $this->swoole->listen($tcpHost, $tcpPort, SWOOLE_SOCK_TCP);
        $tcpServer->set($tcpConfig);
        $tcpServer->on(\'connect\', [$this, \'onConnect\']);
        $tcpServer->on(\'receive\', [$this, \'onReceive\']);
        $tcpServer->on(\'close\', [$this, \'onClose\']);
    }

    /*
     * 启动server
     */
    public function run()
    {
        $this->swoole->start();
    }

    /**
     * Server启动在主进程的主线程时的回调事件处理
     *
     * @param swoole_server $server
     */
    public function onStart(\\swoole_server $server)
    {
        $startedAt = $this->beforeExec();

        $this->stdout(\"**Server Start**\\n\", Console::FG_GREEN);
        $this->stdout(\"master_pid: \");
        $this->stdout(\"{$server->master_pid}\\n\", Console::FG_BLUE);

        $this->onStartHandle($server);

        $this->afterExec($startedAt);
    }

    /**
     * 客户端与服务器建立连接后的回调事件处理
     *
     * @param swoole_server $server
     * @param integer $fd
     * @param integer $reactorId
     */
    abstract public function onConnect(\\swoole_server $server, int $fd, int $reactorId);

    /**
     * 当服务器收到来自客户端的数据时的回调事件处理
     *
     * @param swoole_server $server
     * @param integer $fd
     * @param integer $reactorId
     * @param string $data
     */
    abstract public function onReceive(\\swoole_server $server, int $fd, int $reactorId, string $data);

    /**
     * 当服务器收到来自客户端的HTTP请求时的回调事件处理
     *
     * @param swoole_http_request $request
     * @param swoole_http_response $response
     */
    abstract public function onRequest(\\swoole_http_request $request, \\swoole_http_response $response);

    /**
     * Worker进程/Task进程启动时发生
     *
     * @param swoole_server $server
     * @param integer $worker_id
     */
    abstract public function onWorkerStart(\\swoole_server $server, int $worker_id);

    /**
     * Worker进程/Task进程终止时发生
     *
     * @param swoole_server $server
     * @param integer $worker_id
     */
    abstract public function onWorkerStop(\\swoole_server $server, int $worker_id);

    /**
     * 异步任务处理
     *
     * @param swoole_server $server
     * @param integer $taskId
     * @param integer $srcWorkerId
     * @param mixed $data
     */
    abstract public function onTask(\\swoole_server $server, int $taskId, int $srcWorkerId, mixed $data);

    /**
     * 异步任务处理完成
     *
     * @param swoole_server $server
     * @param integer $taskId
     * @param mixed $data
     */
    abstract public function onTaskFinish(\\swoole_server $server, int $taskId, mixed $data);

    /**
     * 客户端与服务器断开连接后的回调事件处理
     *
     * @param swoole_server $server
     * @param integer $fd
     */
    abstract public function onClose(\\swoole_server $server, $fd);

    /**
     * Server正常结束时的回调事件处理
     *
     * @param swoole_server $server
     */
    public function onShutdown(\\swoole_server $server)
    {
        $startedAt = $this->beforeExec();

        $this->stdout(\"**Server Stop**\\n\", Console::FG_GREEN);
        $this->stdout(\"master_pid: \");
        $this->stdout(\"{$server->master_pid}\\n\", Console::FG_BLUE);

        $this->onShutdownHandle($server);

        $this->afterExec($startedAt);
    }

    /**
     * Server启动在主进程的主线程时的自定义事件处理
     *
     * @param swoole_server $server
     */
    protected function onStartHandle(\\swoole_server $server)
    {

    }

    /**
     * Server正常结束时的自定义事件处理
     *
     * @param swoole_server $server
     */
    protected function onShutdownHandle(\\swoole_server $server)
    {

    }

    /**
     * 获取请求路由
     *
     * @param swoole_http_request $request
     */
    protected function getRoute(\\swoole_http_request $request)
    {
        return ltrim($request->server[\'request_uri\'], \'/\');
    }

    /**
     * 获取请求的GET参数
     *
     * @param swoole_http_request $request
     */
    protected function getParams(\\swoole_http_request $request)
    {
        return $request->get;
    }

    /**
     * 解析收到的数据
     *
     * @param string $data
     */
    protected function decodeData($data)
    {
        return json_decode($data, true);
    }

    /**
     * Before Exec
     */
    protected function beforeExec()
    {
        $startedAt = microtime(true);
        $this->stdout(date(\'Y-m-d H:i:s\') . \"\\n\", Console::FG_YELLOW);
        return $startedAt;
    }

    /**
     * After Exec
     */
    protected function afterExec($startedAt)
    {
        $duration = number_format(round(microtime(true) - $startedAt, 3), 3);
        $this->stdout(\"{$duration} s\\n\\n\", Console::FG_YELLOW);
    }

    /**
     * Prints a string to STDOUT.
     */
    protected function stdout($string)
    {
        if (Console::streamSupportsAnsiColors(\\STDOUT)) {
            $args = func_get_args();
            array_shift($args);
            $string = Console::ansiFormat($string, $args);
        }
        return Console::stdout($string);
    }
}

三:swoole操作类(继承swoole基类)

<?php
/**
 * @link http://www.u-bo.com
 * @copyright 南京友博网络科技有限公司
 * @license http://www.u-bo.com/license/
 */

namespace console\\swoole;

use Yii;
use yii\\db\\Query;
use yii\\helpers\\Console;
use yii\\helpers\\VarDumper;
use apps\\sqjc\\models\\WaterLevel;
use apps\\sqjc\\models\\WaterLevelLog;
use common\\models\\Bayonet;
use common\\models\\Device;
use common\\models\\DeviceCategory;

/**
 * Swoole Server测试类
 *
 * @author wangjian
 * @since 1.0
 */
class Server extends BaseServer
{
    /**
     * @inheritdoc
     */
    public function onConnect($server, $fd, $reactorId)
    {
        $startedAt = $this->beforeExec();

        $this->stdout(\"**Connection Open**\\n\", Console::FG_GREEN);
        $this->stdout(\"fd: \");
        $this->stdout(\"{$fd}\\n\", Console::FG_BLUE);

        $this->afterExec($startedAt);
    }

    /**
     * @inheritdoc
     */
    public function onReceive($server, $fd, $reactorId, $data)
    {
        $startedAt = $this->beforeExec();

        $this->stdout(\"**Received Message**\\n\", Console::FG_GREEN);

        $this->stdout(\"fd: \");
        $this->stdout(\"{$fd}\\n\", Console::FG_BLUE);

        $this->stdout(\"data: \");//接收的数据
        $this->stdout(\"{$data}\\n\", Console::FG_BLUE);



        $result = $server->send($fd, \'回复消息\');



        $this->afterExec($startedAt);
    }


    /**
     * @inheritdoc
     */
    public function onRequest($request, $response)
    {
        $startedAt = $this->beforeExec();

        $this->stdout(\"**HTTP Request**\\n\", Console::FG_GREEN);

        $this->stdout(\"fd: \");
        $this->stdout(\"{$request->fd}\\n\", Console::FG_BLUE);

        $response->status(200);
        $response->end(\'success\');

        $this->afterExec($startedAt);
    }

    /**
     * @inheritdoc
     */
    public function onClose($server, $fd)
    {
        $startedAt = $this->beforeExec();

        $this->stdout(\"**Connection Close**\\n\", Console::FG_GREEN);

        $this->stdout(\"fd: \");
        $this->stdout(\"{$fd}\\n\", Console::FG_BLUE);

        $this->afterExec($startedAt);
    }



    /**
     * @inheritdoc
     */
    public function onTask($server, $taskId, $srcWorkerId, $data)
    {
        $startedAt = $this->beforeExec();

        $this->stdout(\"New AsyncTask: \");
        $this->stdout(\"{$taskId}\\n\", Console::FG_BLUE);
        $this->stdout(\"{$data}\\n\", Console::FG_BLUE);

        $server->finish($data);

        $this->afterExec($startedAt);
    }

    /**
     * @inheritdoc
     */
    public function onWorkerStop($server, $worker_id)
    {
        // Yii::$app->db->close();
    }
    /**
     * @inheritdoc
     */
    public function onWorkerStart($server, $worker_id)
    {
        // Yii::$app->db->open();
    }


    /**
     * @inheritdoc
     */
    public function onTaskFinish($server, $taskId, $data)
    {
        $startedAt = $this->beforeExec();

        $this->stdout(\"AsyncTask finished: \");
        $this->stdout(\"{$taskId}\\n\", Console::FG_BLUE);

        $this->afterExec($startedAt);
    }
}

四:操作TCP服务

<?php
/**
 * @link http://www.u-bo.com
 * @copyright 南京友博网络科技有限公司
 * @license http://www.u-bo.com/license/
 */

namespace console\\controllers;

use Yii;
use yii\\helpers\\Console;
use yii\\helpers\\FileHelper;
use yii\\helpers\\ArrayHelper;
use console\\swoole\\Server;

/**
 * WebSocket Server controller.
 *
 * @see https://github.com/tystudy/yii2-swoole-websocket/blob/master/README.md
 *
 * @author wangjian
 * @since 1.0
 */
class SwooleController extends Controller
{
    /**
     * @var string 监听IP
     */
    public $host;
    /**
     * @var string 监听端口
     */
    public $port;
    /**
     * @var boolean 是否以守护进程方式启动
     */
    public $daemon = false;
    /**
     * @var boolean 是否启动测试类
     */
    public $test = false;

    /**
     * @var array Swoole参数配置项
     */
    private $_params;

    /**
     * @var array Swoole参数配置项(HTTP协议)
     */
    private $_http_params;
    /**
     * @var array Swoole参数配置项(TCP协议)
     */
    private $_tcp_params;

    /**
     * @inheritdoc
     */
    public function beforeAction($action)
    {
        if (parent::beforeAction($action)) {
            //判断是否开启swoole拓展
            if (!extension_loaded(\'swoole\')) {
                return false;
            }

            //获取swoole配置信息
            if (!isset(Yii::$app->params[\'swoole\'])) {
                return false;
            }
            $this->_params = Yii::$app->params[\'swoole\'];
            $this->_http_params = ArrayHelper::remove($this->_params, \'http\');
            $this->_tcp_params = ArrayHelper::remove($this->_params, \'tcp\');

            foreach ($this->_params as &$param) {
                if (strncmp($param, \'@\', 1) === 0) {
                    $param = Yii::getAlias($param);
                }
            }

            $this->_params = ArrayHelper::merge($this->_params, [
                \'daemonize\' => $this->daemon
            ]);

            return true;
        } else {
            return false;
        }
    }
    /**
     * 启动服务
     */
    public function actionStart()
    {
        if ($this->getPid() !== false) {
            $this->stdout(\"WebSocket Server is already started!\\n\", Console::FG_RED);
            return self::EXIT_CODE_NORMAL;
        }
        $server = new Server($this->_http_params, $this->_tcp_params, $this->_params);
        $server->run();
    }

    /**
     * 停止服务
     */
    public function actionStop()
    {
        $pid = $this->getPid();
        if ($pid === false) {
            $this->stdout(\"Tcp Server is already stoped!\\n\", Console::FG_RED);
            return self::EXIT_CODE_NORMAL;
        }

        \\swoole_process::kill($pid);
    }

    /**
     * 清理日志文件
     */
    public function actionClearLog()
    {
        $logFile = Yii::getAlias($this->_params[\'log_file\']);
        FileHelper::unlink($logFile);
    }

    /**
     * 获取进程PID
     *
     * @return false|integer PID
     */
    private function getPid()
    {
        $pidFile = $this->_params[\'pid_file\'];
        if (!file_exists($pidFile)) {
            return false;
        }

        $pid = file_get_contents($pidFile);
        if (empty($pid)) {
            return false;
        }

        $pid = intval($pid);
        if (\\swoole_process::kill($pid, 0)) {
            return $pid;
        } else {
            FileHelper::unlink($pidFile);
            return false;
        }
    }

    /**
     * @inheritdoc
     */
    public function options($actionID)
    {
        return ArrayHelper::merge(parent::options($actionID), [
            \'daemon\',
            \'test\'
        ]);
    }

    /**
     * @inheritdoc
     */
    public function optionAliases()
    {
        return ArrayHelper::merge(parent::optionAliases(), [
            \'d\' => \'daemon\',
            \'t\' => \'test\',
        ]);
    }
}

以上就是php使用swoole实现TCP服务的详细内容,更多关于php swoole实现TCP服务的资料请关注其它相关文章!

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容