M_Worker

Woher die Inspiration:

Eigentlich reizt mich PHP nicht mehr, vor allem nicht in meiner Freizeit, ich habe einfach schon viel zu viel davon gelesen/geschrieben und das nicht nur in meiner Freizeit. …Viel zu viel! …Alles …aber aberwitzig wurde es neulich, als es Probleme mit Server-Cronjobs gab, die PHP-Skripte starten, die wiederum per Http-Request neue PHP-Skripte starten. Also kurz gesagt ein Alptraum und schwerer Plaunungsfehler.

Was ich wollte:

– PHP-Skripte, die sich wie Services/Deamonen verhalten
– Jobs einem Worker übergeben, der diese Zyklisch ausführt (z.b. alle 10 Minuten)
– der Worker soll nach bedarf neue Worker öffnen (per Http-Request)
– wenn man den ersten Worker beendet, sollen sich alle Child-Worker auch beenden
…und das in PHP 🙁

Also hier meine Antwort (github):

Der erste gestartete Worker lockt eine Datei und startet nacheinander seine Jobs. Konnte er die Jobs nicht in einem konfigurierbaren Intervall abarbeiten, startet er einen neuen Worker per Http-Request, anhand der gleichen Url, mit der er selbst gestartet wurde. Jeder Child-Worker kann selbst anhand der Konfiguration neue Childs erzeugen oder sich ggf. selbst beenden.

M/Worker/Test.php (github)

class M_Worker_Test
{
  public function run()
 {           
    $config = array('maxLoopSleepMs' => 4000,    
            'minLoopSleepMs' => 1000,
            'loopSleepAdjustmentRate' => 2,  
            'forkAfterNoSleepCycles' => 2, 
            'destroyAfterSleepCycles' => 2,  
            'maxWorker' => 10);

    $job = new M_Worker_Job('testJob');  //job needs 1-6secs

    $worker = new M_Worker('testWorker', array($job), $config);

   $worker->run();
  }
}

und hier das Herzstück(gekürzt) M/Worker.php (github)

ignore_user_abort(true);
ini_set('max_execution_time', 0);
ob_implicit_flush(true);

class M_Worker
{ 
    [...]

 public function __construct($workerName, array $jobs, array $config = null)
 {
   $this->uid = uniqid('M_Id');
   $this->workerName = $workerName;

   if($config)
     $this->config = array_merge($this->config, $config);

    foreach($jobs as $job)
      $this->addJob($job);
 }

 public function run()
 {     
    $this->triggerInit();        
    while(!$this->stopped)
   {     
      $lastRunStartTime = microtime(true);    

      foreach($this->jobQueue as $job)
       $job->runJob($this); 

      $lastRunDuration = (int)((microtime(true) - $lastRunStartTime) * 1000);

     $this->calibrateAndDoLoopSleep($lastRunDuration);

      $this->forkOrDestroyIfNeeded();    
    }   
    $this->triggerShoutDown();
 } 

  protected function triggerInit()
  {     
    $this->actualLoopSleep = $this->getConfigValue('maxLoopSleepMs') / 2;

   $this->stopped = false;

    if($this->getConfigValue('outputLogFile'))
     $this->workerLogFileHandle = fopen($this->getConfigValue('outputLogFile'), 'a');

    $this->isMainWorker = $this->getMainLock();

   if($this->isMainWorker)
    {           
      ignore_user_abort(false);
     $this->initWorkerCounter();
   }
   else 
   { 
      header("Connection: close"); 
     $this->runningWorker = $this->countRunningWorker();
     $this->incrementWorkerCounter();
   } 

    foreach($this->jobQueue as $job)
     $job->onJobInit($this);      
  }

 protected function triggerShoutdown()
 {   
    foreach($this->jobQueue as $job)
     $job->onJobDestroy($this);

   if(!$this->isMainWorker)
     $this->decrementWorkerCounter();     

    if(is_resource($this->workerLogFileHandle))
      fclose($this->workerLogFileHandle);

    $this->releaseMainLock();
  }

 protected function calibrateAndDoLoopSleep($lastRunDuration)
  {   
    if($lastRunDuration > $this->actualLoopSleep)
   { 
      $this->sleepCycles = 0;    
      ++$this->noSleepCycles;            
      $this->actualLoopSleep = max( $this->actualLoopSleep 
                          / $this->getConfigValue('loopSleepAdjustmentRate'),
                           $this->getConfigValue('minLoopSleepMs'));
    }
   else 
   {
     $this->noSleepCycles = 0;
      ++$this->sleepCycles;      
      $this->actualLoopSleep = min( $this->actualLoopSleep 
                       * $this->getConfigValue('loopSleepAdjustmentRate'),
                          $this->getConfigValue('maxLoopSleepMs'));

      $timeToSleep = (int)($this->actualLoopSleep - $lastRunDuration);
     usleep($timeToSleep);
   } 
  } 

  protected function forkOrDestroyIfNeeded()
  {
   if($this->noSleepCycles > $this->getConfigValue('forkAfterNoSleepCycles')
    && $this->countRunningWorker() < $this->getConfigValue('maxWorker'))
   {         
      $this->fork();
   }
   elseif(!$this->isMainWorker
    &&(     
         $this->sleepCycles > $this->getConfigValue('destroyAfterSleepCycles')
     || $this->getMainLock() ))
   {     
      $this->releaseMainLock();
      $this->stopped = true;
   }           
  }

 protected function getMainLock()
  {
   if(!is_resource($this->mainWorkerLockFileHandle))
    {
     $this->mainWorkerLockFileHandle = 
         fopen('./M_Worker_'.ucfirst($this->workerName).'.lock', 'r');  
    }
   return is_resource($this->mainWorkerLockFileHandle) 
        && @flock($this->mainWorkerLockFileHandle, LOCK_EX | LOCK_NB);
  }

 protected function fork()
 {
   $url = 'http://'.$_SERVER['HTTP_HOST'].':'.$_SERVER['SERVER_PORT'].$_SERVER['REQUEST_URI'];
   $fp = fopen($url, 'r');   
    if(is_resource($fp))
      fclose($fp);
  }

   [...]
}