简单多进程任务处理程序

php

浏览数:1,463

2019-1-8

<?php /** * 多进程任务处理辅助类 */ class TaskHelper{ /** * worker进程最大数量, 至少两个 */ protected $maxProcess; /** * 动态参数,设置为 0 表示不使用自适应进程方式 */ protected $dynamicParam; /** * 任务的实际处理者,对象, 必须有 runWorker 方法 */ protected $worker; public function __construct($worker, $maxProcess = 4, $dynamicParam = 0) { $this->worker = $worker;
        $this->maxProcess = max(2, (int)$maxProcess);
        $this->dynamicParam = max(0, (int)$dynamicParam);
    }
    
    /**
     * fork子进程处理数据
     * @param Array $data 需要处理的数据,必须是数组
     */
    public function run(&$data) {
        $count = count($data);
        
        // 需要开启的子进程数
        $num = $this->dynamicParam ? min( $this->maxProcess, ceil($count / $this->dynamicParam) ) : $this->maxProcess;
        
        // 每个进程处理的数据量
        $n = ceil($count / $num); 
        
        $childs = array();
        
        for($i = 0; $i < $count; $i += $n) { $pid = pcntl_fork(); if($pid == -1) { echo "Fork worker failed!"; return false; } if($pid) { echo "Fork worker success! pid:", $pid, "\n"; $childs[] = $pid; } else { $sliceData = array_slice($data, $i, $n); $this->worker->runWorker($sliceData);
                exit();
            }
        }
        
        $this->check($childs);
    }
    
    /**
     * 检测子进程状态,监控子进程是否退出,并防止僵尸进程
     */
    protected function check($childs) {
        while(true) {
            foreach($childs as $index => $pid) {
                $pid && $res = pcntl_waitpid($pid, $status, WNOHANG);
                if(!$pid || $res == -1) {
                    echo "End worker: $pid \n";
                    unset($childs[$index]);
                }
            }
            
            if(empty($childs)) break;
            sleep(1);
        }
    }
}

/**
 * 使用示例
 */
class Test {
    public function run() {
        $data = array_fill(0, 800, 1);
        
        // 开8个进程将 $data 分成8份,交由下面的 runWorker 方法处理
        $task = new TaskHelper($this, 8);
        
        $task->run($data); // 如果前面连接了数据库、redis等,最好在这之前关闭掉
    }
    
    /**
     * 这里编写代相应码来处理数据
     */
    public function runWorker($data) {
        // do something...
    }
}

$obj = new Test;
$obj->run();