Woher die Inspiration:
Eigentlich reizt mich PHP nicht mehr, vor allem nicht in meiner Freizeit, ich habe einfach schon viel zu viel davon gelesen/geschrieben und das nicht nur in meiner Freizeit. …Viel zu viel! …Alles …aber aberwitzig wurde es neulich, als es Probleme mit Server-Cronjobs gab, die PHP-Skripte starten, die wiederum per Http-Request neue PHP-Skripte starten. Also kurz gesagt ein Alptraum und schwerer Plaunungsfehler.
Was ich wollte:
– PHP-Skripte, die sich wie Services/Deamonen verhalten
– Jobs einem Worker übergeben, der diese Zyklisch ausführt (z.b. alle 10 Minuten)
– der Worker soll nach bedarf neue Worker öffnen (per Http-Request)
– wenn man den ersten Worker beendet, sollen sich alle Child-Worker auch beenden
…und das in PHP 🙁
Also hier meine Antwort (github):
Der erste gestartete Worker lockt eine Datei und startet nacheinander seine Jobs. Konnte er die Jobs nicht in einem konfigurierbaren Intervall abarbeiten, startet er einen neuen Worker per Http-Request, anhand der gleichen Url, mit der er selbst gestartet wurde. Jeder Child-Worker kann selbst anhand der Konfiguration neue Childs erzeugen oder sich ggf. selbst beenden.
M/Worker/Test.php (github)
class M_Worker_Test { public function run() { $config = array('maxLoopSleepMs' => 4000, 'minLoopSleepMs' => 1000, 'loopSleepAdjustmentRate' => 2, 'forkAfterNoSleepCycles' => 2, 'destroyAfterSleepCycles' => 2, 'maxWorker' => 10); $job = new M_Worker_Job('testJob'); //job needs 1-6secs $worker = new M_Worker('testWorker', array($job), $config); $worker->run(); } }
und hier das Herzstück(gekürzt) M/Worker.php (github)
ignore_user_abort(true); ini_set('max_execution_time', 0); ob_implicit_flush(true); class M_Worker { [...] public function __construct($workerName, array $jobs, array $config = null) { $this->uid = uniqid('M_Id'); $this->workerName = $workerName; if($config) $this->config = array_merge($this->config, $config); foreach($jobs as $job) $this->addJob($job); } public function run() { $this->triggerInit(); while(!$this->stopped) { $lastRunStartTime = microtime(true); foreach($this->jobQueue as $job) $job->runJob($this); $lastRunDuration = (int)((microtime(true) - $lastRunStartTime) * 1000); $this->calibrateAndDoLoopSleep($lastRunDuration); $this->forkOrDestroyIfNeeded(); } $this->triggerShoutDown(); } protected function triggerInit() { $this->actualLoopSleep = $this->getConfigValue('maxLoopSleepMs') / 2; $this->stopped = false; if($this->getConfigValue('outputLogFile')) $this->workerLogFileHandle = fopen($this->getConfigValue('outputLogFile'), 'a'); $this->isMainWorker = $this->getMainLock(); if($this->isMainWorker) { ignore_user_abort(false); $this->initWorkerCounter(); } else { header("Connection: close"); $this->runningWorker = $this->countRunningWorker(); $this->incrementWorkerCounter(); } foreach($this->jobQueue as $job) $job->onJobInit($this); } protected function triggerShoutdown() { foreach($this->jobQueue as $job) $job->onJobDestroy($this); if(!$this->isMainWorker) $this->decrementWorkerCounter(); if(is_resource($this->workerLogFileHandle)) fclose($this->workerLogFileHandle); $this->releaseMainLock(); } protected function calibrateAndDoLoopSleep($lastRunDuration) { if($lastRunDuration > $this->actualLoopSleep) { $this->sleepCycles = 0; ++$this->noSleepCycles; $this->actualLoopSleep = max( $this->actualLoopSleep / $this->getConfigValue('loopSleepAdjustmentRate'), $this->getConfigValue('minLoopSleepMs')); } else { $this->noSleepCycles = 0; ++$this->sleepCycles; $this->actualLoopSleep = min( $this->actualLoopSleep * $this->getConfigValue('loopSleepAdjustmentRate'), $this->getConfigValue('maxLoopSleepMs')); $timeToSleep = (int)($this->actualLoopSleep - $lastRunDuration); usleep($timeToSleep); } } protected function forkOrDestroyIfNeeded() { if($this->noSleepCycles > $this->getConfigValue('forkAfterNoSleepCycles') && $this->countRunningWorker() < $this->getConfigValue('maxWorker')) { $this->fork(); } elseif(!$this->isMainWorker &&( $this->sleepCycles > $this->getConfigValue('destroyAfterSleepCycles') || $this->getMainLock() )) { $this->releaseMainLock(); $this->stopped = true; } } protected function getMainLock() { if(!is_resource($this->mainWorkerLockFileHandle)) { $this->mainWorkerLockFileHandle = fopen('./M_Worker_'.ucfirst($this->workerName).'.lock', 'r'); } return is_resource($this->mainWorkerLockFileHandle) && @flock($this->mainWorkerLockFileHandle, LOCK_EX | LOCK_NB); } protected function fork() { $url = 'http://'.$_SERVER['HTTP_HOST'].':'.$_SERVER['SERVER_PORT'].$_SERVER['REQUEST_URI']; $fp = fopen($url, 'r'); if(is_resource($fp)) fclose($fp); } [...] }