作为程序员一定要保持良好的睡眠,才能好编程

swoole 定时器实现毫秒级定时任务

发布时间:2019-03-25

项目开发中,如果有定时任务的业务要求,我们会使用linux的crontab来解决,但是它的最小粒度是分钟级别,如果要求粒度是秒级别的,甚至毫秒级别的,crontab就无法满足,值得庆幸的是swoole提供的强大的毫秒定时器。


应用场景举例

我们可能会遇到这样的场景:

  • 场景一:每隔30秒获取一次本机内存使用率

  • 场景二:2分钟后执行报表发送任务

  • 场景三:每天凌晨2点钟定时请求第三方接口,如果接口有数据返回则停止任务,如果接口由于某种原因没有响应或者没有数据返回则5分钟后继续尝试请求该接口,尝试5次后仍然失败则停止该任务




Swoole提供了异步毫秒定时器函数:


swoole_timer_tick(int $msec, callable $callback):设置一个间隔时钟定时器,每隔$msec毫秒执行一次$callback,类似于javascript中的setInterval()。


swoole_timer_after(int $after_time_ms, mixed $callback_function):在指定的时间$after_time_ms后执行$callback_function,类似于javascript的setTimeout()。


swoole_timer_clear(int $timer_id):删除指定id的定时器,类似于javascript的clearInterval()。




对于场景一,经常用在系统检测统计方面,实时性要求比较高,但又能控制好频率,多用于后台服务器性能监控,可以生成可视化图表。可以是30秒获取一次内存使用率,也可以是10秒,而crontab最小粒度只能设置为1分钟。


swoole_timer_tick(30000, function($timer) use ($task_id) { // 启用定时器,每30秒执行一次
    $memPercent = $this->getMemoryUsage(); //计算内存使用率
    echo date('Y-m-d H:i:s') . '当前内存使用率:'.$memPercent."\n";
});


timer.png




对于场景二,直接定义xx时间后执行某项任务的话,貌似crontab比较困难,而使用swoole的swoole_timer_after可以实现:


swoole_timer_after(120000, function() use ($str) { //2分钟后执行
    $this->sendReport(); //发送报表
    echo "send report, $str\n";
});


对于场景三,用来作尝试请求,请求失败后继续,如果成功则停止请求。用crontab也能解决,但是比较傻,比如设置每隔5分钟请求一次,不管成功会失败都会去执行一次。而用swoole定时器则智能多了。


swoole_timer_tick(5*60*1000, function($timer) use ($url) { // 启用定时器,每5分钟执行一次
    $rs = $this->postUrl($url);    if ($rs) {        //业务代码...
        swoole_timer_clear($timer); // 停止定时器
        echo date('Y-m-d H:i:s'). "请求接口任务执行成功\n";
    } else {        echo date('Y-m-d H:i:s'). "请求接口失败,5分钟后再次尝试\n";
    }
});






swoole的定时任务有多种实现方式:


1、采用swoole的websocket 进行


timer.php

class Ws{ 
    public $server=null;
    public $setAttrs=[];
    CONST HOST="0.0.0.0";
    const PORT=9998;
    public function __construct(){
        $this->server=new swoole_websocket_server(self::HOST,self::PORT);
        $this->server->on("open",[$this,"onOpen"]);
        $this->server->on("message",[$this,"onMessage"]);
        $this->server->on("close",[$this,"onClose"]);
    }   

    public function onOpen($server,$frame){
            echo $frame->fd."---连接成功\n";
    }   
    
    public function onMessage($server,$frame){
        //echo $frame->fd."----发来消息说:".$frame->data."\n";
        /*$_dataAttr=[
            'task'=>1,
            'fd'=>$frame->fd
        ];
        $server->task($_dataAttr);*/
    
        swoole_timer_tick(1000, function($timer) use ($server,$frame) { // 启用定时器,每30秒执行一次
          $memPercent = $this->getMemoryUsage(); //计算内存使用率
          $content=date('Y-m-d H:i:s') . '当前内存使用率:'.$memPercent;
    
          $server->push($frame->fd,"服务器返回消息说:".$content);
        }); 
    
        //$server->push($frame->fd,"服务器返回消息说:".$frame->data."--".date("Y-m-d H:i:s"));
    }
    
    
    public function onClose($server,$fd){
        echo $fd." 客户端已经关闭";
    }

   public function getMemoryUsage()
    {
        // MEMORY
        if (false === ($str = @file("/proc/meminfo"))) return false;
        $str = implode("", $str);
        preg_match_all("/MemTotal\s{0,}\:+\s{0,}([\d\.]+).+?MemFree\s{0,}\:+\s{0,}([\d\.]+).+?Cached\s{0,}\:+\s{0,}([\d\.]+).+?SwapTotal\s{0,}\:+\s{0,}([\d\.]+).+?SwapFree\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buf);
        //preg_match_all("/Buffers\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buffers);

        $memTotal = round($buf[1][0]/1024, 2);
        $memFree = round($buf[2][0]/1024, 2);
        $memUsed = $memTotal - $memFree;
        $memPercent = (floatval($memTotal)!=0) ? round($memUsed/$memTotal*100,2):0;

        return $memPercent;
    }



    /**
     * @param $key worker_num
     * @param $val 2
     */
    public function set($key,$val){
        $this->setAttrs[$key]=$val;
    }
    
    //启动服务
    public function start(){
        $this->server->set($this->setAttrs);
        $this->server->start();
    }

}

$_ws=new Ws();

//echo $_ws->getMemoryUsage();
$_ws->set("worker_num",2);
$_ws->set("daemonize",0);
$_ws->start();


通过 php timer.php 运行程序


然后打开timer.html 文档:


timer.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title></title>
    <style type="text/css">
        #container{
            overflow-y: scroll;
        }
        #container div{
            font-size: 14px;
            color:#666;
            padding: 5px 0px;
            margin:0px auto;
            width:95%;

        }
    </style>

    <script type="text/javascript">

        window.onload=function(){
            var container=document.getElementById("container");
            var submitButton=document.getElementById("submit");
            var message=null;
            var webSocket=new WebSocket("ws://172.28.81.248:9998");

            webSocket.onopen=function(event){
                webSocket.send("你好!");
                console.log("服务器连接成功...");
            }

            webSocket.onmessage=function(event){
                console.log("服务器回复说:"+event.data);
                container.appendChild(createDiv(event.data));
            }

            webSocket.onclose=function(event){
                console.log("connect close");
            }

            submitButton.onclick=function(){
                message=document.getElementById("message").value;
                if(message){
                    webSocket.send(message);
                    document.getElementById("message").value="";
                }else{
                    alert("请输入内容后,点击发送");
                }
            }

            function createDiv(message){
                var div=document.createElement("div");
                var textNode=document.createTextNode(message);
                div.appendChild(textNode);
                return div;
            }
        }
    </script>
</head>
<body>
<div id="container" style="border:1px solid #ccc; height:360px;width:600px;">

</div>
<input type="text" id="message"><input type="button" id="submit" value="发送">

</body>
</html>



通过上面的几个步骤建立了 websocket的长连接服务,以上只是实现了简单的数据传输,可以接入 百度ecchat  试试图例 





实现方式二、


采用TCP服务的异步任务实现:


<?php

class Ws{
    public $server=null;
    public $setAttrs=[];
    CONST HOST="0.0.0.0";
    const PORT=9997;
    public function __construct($config){
				$this->server=new swoole_server(self::HOST,self::PORT);

				if(!empty($config)){
					$this->setAttrs=array_merge($this->setAttrs,$config);
				}

        $this->server->on("task",[$this,"onTask"]);
        $this->server->on("finish",[$this,"onFinish"]);
        $this->server->on("connect",[$this,"onConnect"]);
        $this->server->on("receive",[$this,"onReceive"]);
        $this->server->on("close",[$this,"onClose"]);
    }

    public function onConnect($server,$fd){
						echo $fd."---连接成功\n";
    }
        
    public function onReceive($server,$fd,$from_id,$data){
						echo "Get Message From Client {$fd}:{$data}\n";
						$result['message']="success";
						$server->send($fd,json_encode($result));
						$server->task([]);

        //$server->push($frame->fd,"服务器返回消息说:".$frame->data."--".date("Y-m-d H:i:s"));
    }

    public function onClose($server,$fd){
        echo $fd." 客户端已经关闭";
    }

    public function onTask($server,$taskId,$workerId,$data){
				
						swoole_timer_tick(1000,function($timer){
							

										echo date('Y-m-d H:i:s')." ==== ".rand(10000,90000)."   memTotal:".$this->getMemoryUsage()."\n";


						});


		}


    private function getMemoryUsage()
    {
        // MEMORY
        if (false === ($str = @file("/proc/meminfo"))) return false;
        $str = implode("", $str);
        preg_match_all("/MemTotal\s{0,}\:+\s{0,}([\d\.]+).+?MemFree\s{0,}\:+\s{0,}([\d\.]+).+?Cached\s{0,}\:+\s{0,}([\d\.]+).+?SwapTotal\s{0,}\:+\s{0,}([\d\.]+).+?SwapFree\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buf);
        //preg_match_all("/Buffers\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buffers);

        $memTotal = round($buf[1][0]/1024, 2);
        $memFree = round($buf[2][0]/1024, 2);
        $memUsed = $memTotal - $memFree;
        $memPercent = (floatval($memTotal)!=0) ? round($memUsed/$memTotal*100,2):0;

        return $memPercent;
    }

    public function onFinish($server,$taskId,$data){

        echo $taskId."  ---".$data."\n";

    }

    /**
     * @param $key worker_num
     * @param $val 2
     */
    public function set($key,$val){
        $this->setAttrs[$key]=$val;
    }

    //启动服务
    public function start(){
        $this->server->set($this->setAttrs);
        $this->server->start();
    }

}

$_ws=new Ws([]);
$_ws->set("worker_num",2);
$_ws->set("task_worker_num",2);
$_ws->set("daemonize",0);
$_ws->start();

view2.png


client.php


<?php

$client=stream_socket_client('tcp://172.28.81.248:9997',$errno,$error);

if(!$client){
    print_r($errno);
    print_r($error);
}

$message=serialize([
'message'=>'hellow'.time(),
'page'=>1,
]);

fwrite($client,$message,mb_strlen($message,'utf-8'));

$info=fread($client,1024);


echo strlen($info);

echo "服务器说:".$info."\n";

fclose($client);





view1.png