项目开发中,如果有定时任务的业务要求,我们会使用linux的crontab来解决,但是它的最小粒度是分钟级别,如果要求粒度是秒级别的,甚至毫秒级别的,crontab就无法满足,值得庆幸的是swoole提供的强大的毫秒定时器。
应用场景举例
我们可能会遇到这样的场景:
场景一:每隔30秒获取一次本机内存使用率
场景二:2分钟后执行报表发送任务
场景三:每天凌晨2点钟定时请求第三方接口,如果接口有数据返回则停止任务,如果接口由于某种原因没有响应或者没有数据返回则5分钟后继续尝试请求该接口,尝试5次后仍然失败则停止该任务
Swoole提供了异步毫秒定时器函数:
swoole_timer_tick(int $msec, callable $callback):设置一个间隔时钟定时器,每隔$msec毫秒执行一次$callback,类似于javascript中的setInterval()。
swoole_timer_after(int $after_time_ms, mixed $callback_function):在指定的时间$after_time_ms后执行$callback_function,类似于javascript的setTimeout()。
swoole_timer_clear(int $timer_id):删除指定id的定时器,类似于javascript的clearInterval()。
对于场景一,经常用在系统检测统计方面,实时性要求比较高,但又能控制好频率,多用于后台服务器性能监控,可以生成可视化图表。可以是30秒获取一次内存使用率,也可以是10秒,而crontab最小粒度只能设置为1分钟。
swoole_timer_tick(30000, function($timer) use ($task_id) { // 启用定时器,每30秒执行一次 $memPercent = $this->getMemoryUsage(); //计算内存使用率 echo date('Y-m-d H:i:s') . '当前内存使用率:'.$memPercent."\n"; });
对于场景二,直接定义xx时间后执行某项任务的话,貌似crontab比较困难,而使用swoole的swoole_timer_after
可以实现:
swoole_timer_after(120000, function() use ($str) { //2分钟后执行 $this->sendReport(); //发送报表 echo "send report, $str\n"; });
对于场景三,用来作尝试请求,请求失败后继续,如果成功则停止请求。用crontab也能解决,但是比较傻,比如设置每隔5分钟请求一次,不管成功会失败都会去执行一次。而用swoole定时器则智能多了。
swoole_timer_tick(5*60*1000, function($timer) use ($url) { // 启用定时器,每5分钟执行一次 $rs = $this->postUrl($url); if ($rs) { //业务代码... swoole_timer_clear($timer); // 停止定时器 echo date('Y-m-d H:i:s'). "请求接口任务执行成功\n"; } else { echo date('Y-m-d H:i:s'). "请求接口失败,5分钟后再次尝试\n"; } });
swoole的定时任务有多种实现方式:
1、采用swoole的websocket 进行
timer.php
class Ws{ public $server=null; public $setAttrs=[]; CONST HOST="0.0.0.0"; const PORT=9998; public function __construct(){ $this->server=new swoole_websocket_server(self::HOST,self::PORT); $this->server->on("open",[$this,"onOpen"]); $this->server->on("message",[$this,"onMessage"]); $this->server->on("close",[$this,"onClose"]); } public function onOpen($server,$frame){ echo $frame->fd."---连接成功\n"; } public function onMessage($server,$frame){ //echo $frame->fd."----发来消息说:".$frame->data."\n"; /*$_dataAttr=[ 'task'=>1, 'fd'=>$frame->fd ]; $server->task($_dataAttr);*/ swoole_timer_tick(1000, function($timer) use ($server,$frame) { // 启用定时器,每30秒执行一次 $memPercent = $this->getMemoryUsage(); //计算内存使用率 $content=date('Y-m-d H:i:s') . '当前内存使用率:'.$memPercent; $server->push($frame->fd,"服务器返回消息说:".$content); }); //$server->push($frame->fd,"服务器返回消息说:".$frame->data."--".date("Y-m-d H:i:s")); } public function onClose($server,$fd){ echo $fd." 客户端已经关闭"; } public function getMemoryUsage() { // MEMORY if (false === ($str = @file("/proc/meminfo"))) return false; $str = implode("", $str); preg_match_all("/MemTotal\s{0,}\:+\s{0,}([\d\.]+).+?MemFree\s{0,}\:+\s{0,}([\d\.]+).+?Cached\s{0,}\:+\s{0,}([\d\.]+).+?SwapTotal\s{0,}\:+\s{0,}([\d\.]+).+?SwapFree\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buf); //preg_match_all("/Buffers\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buffers); $memTotal = round($buf[1][0]/1024, 2); $memFree = round($buf[2][0]/1024, 2); $memUsed = $memTotal - $memFree; $memPercent = (floatval($memTotal)!=0) ? round($memUsed/$memTotal*100,2):0; return $memPercent; } /** * @param $key worker_num * @param $val 2 */ public function set($key,$val){ $this->setAttrs[$key]=$val; } //启动服务 public function start(){ $this->server->set($this->setAttrs); $this->server->start(); } } $_ws=new Ws(); //echo $_ws->getMemoryUsage(); $_ws->set("worker_num",2); $_ws->set("daemonize",0); $_ws->start();
通过 php timer.php 运行程序
然后打开timer.html 文档:
timer.html
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title></title> <style type="text/css"> #container{ overflow-y: scroll; } #container div{ font-size: 14px; color:#666; padding: 5px 0px; margin:0px auto; width:95%; } </style> <script type="text/javascript"> window.onload=function(){ var container=document.getElementById("container"); var submitButton=document.getElementById("submit"); var message=null; var webSocket=new WebSocket("ws://172.28.81.248:9998"); webSocket.onopen=function(event){ webSocket.send("你好!"); console.log("服务器连接成功..."); } webSocket.onmessage=function(event){ console.log("服务器回复说:"+event.data); container.appendChild(createDiv(event.data)); } webSocket.onclose=function(event){ console.log("connect close"); } submitButton.onclick=function(){ message=document.getElementById("message").value; if(message){ webSocket.send(message); document.getElementById("message").value=""; }else{ alert("请输入内容后,点击发送"); } } function createDiv(message){ var div=document.createElement("div"); var textNode=document.createTextNode(message); div.appendChild(textNode); return div; } } </script> </head> <body> <div id="container" style="border:1px solid #ccc; height:360px;width:600px;"> </div> <input type="text" id="message"><input type="button" id="submit" value="发送"> </body> </html>
通过上面的几个步骤建立了 websocket的长连接服务,以上只是实现了简单的数据传输,可以接入 百度ecchat 试试图例
实现方式二、
采用TCP服务的异步任务实现:
<?php class Ws{ public $server=null; public $setAttrs=[]; CONST HOST="0.0.0.0"; const PORT=9997; public function __construct($config){ $this->server=new swoole_server(self::HOST,self::PORT); if(!empty($config)){ $this->setAttrs=array_merge($this->setAttrs,$config); } $this->server->on("task",[$this,"onTask"]); $this->server->on("finish",[$this,"onFinish"]); $this->server->on("connect",[$this,"onConnect"]); $this->server->on("receive",[$this,"onReceive"]); $this->server->on("close",[$this,"onClose"]); } public function onConnect($server,$fd){ echo $fd."---连接成功\n"; } public function onReceive($server,$fd,$from_id,$data){ echo "Get Message From Client {$fd}:{$data}\n"; $result['message']="success"; $server->send($fd,json_encode($result)); $server->task([]); //$server->push($frame->fd,"服务器返回消息说:".$frame->data."--".date("Y-m-d H:i:s")); } public function onClose($server,$fd){ echo $fd." 客户端已经关闭"; } public function onTask($server,$taskId,$workerId,$data){ swoole_timer_tick(1000,function($timer){ echo date('Y-m-d H:i:s')." ==== ".rand(10000,90000)." memTotal:".$this->getMemoryUsage()."\n"; }); } private function getMemoryUsage() { // MEMORY if (false === ($str = @file("/proc/meminfo"))) return false; $str = implode("", $str); preg_match_all("/MemTotal\s{0,}\:+\s{0,}([\d\.]+).+?MemFree\s{0,}\:+\s{0,}([\d\.]+).+?Cached\s{0,}\:+\s{0,}([\d\.]+).+?SwapTotal\s{0,}\:+\s{0,}([\d\.]+).+?SwapFree\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buf); //preg_match_all("/Buffers\s{0,}\:+\s{0,}([\d\.]+)/s", $str, $buffers); $memTotal = round($buf[1][0]/1024, 2); $memFree = round($buf[2][0]/1024, 2); $memUsed = $memTotal - $memFree; $memPercent = (floatval($memTotal)!=0) ? round($memUsed/$memTotal*100,2):0; return $memPercent; } public function onFinish($server,$taskId,$data){ echo $taskId." ---".$data."\n"; } /** * @param $key worker_num * @param $val 2 */ public function set($key,$val){ $this->setAttrs[$key]=$val; } //启动服务 public function start(){ $this->server->set($this->setAttrs); $this->server->start(); } } $_ws=new Ws([]); $_ws->set("worker_num",2); $_ws->set("task_worker_num",2); $_ws->set("daemonize",0); $_ws->start();
client.php
<?php $client=stream_socket_client('tcp://172.28.81.248:9997',$errno,$error); if(!$client){ print_r($errno); print_r($error); } $message=serialize([ 'message'=>'hellow'.time(), 'page'=>1, ]); fwrite($client,$message,mb_strlen($message,'utf-8')); $info=fread($client,1024); echo strlen($info); echo "服务器说:".$info."\n"; fclose($client);