blog.tomhanoldt.info

M_Worker

M_Worker
May 18, 2011

Woher die Inspiration:
Eigentlich reizt mich PHP nicht mehr, vor allem nicht in meiner Freizeit, ich habe einfach schon viel zu viel davon gelesen/geschrieben und das nicht nur in meiner Freizeit. ...Viel zu viel! ...Alles ...aber aberwitzig wurde es neulich, als es Probleme mit Server-Cronjobs gab, die PHP-Skripte starten, die wiederum per Http-Request neue PHP-Skripte starten. Also kurz gesagt ein Alptraum und schwerer Plaunungsfehler.

Was ich wollte:

  • PHP-Skripte, die sich wie Services/Deamonen verhalten
  • Jobs einem Worker übergeben, der diese Zyklisch ausführt (z.b. alle 10 Minuten)
  • der Worker soll nach bedarf neue Worker öffnen (per Http-Request)
  [...]
 
  public function __construct($workerName, array $jobs, array $config = null)
  {
    $this->uid = uniqid('M_Id');
    $this->workerName = $workerName;
 
    if($config)
      $this->config = array_merge($this->config, $config);
 
    foreach($jobs as $job)
      $this->addJob($job);
  }
 
  public function run()
  {
    $this->triggerInit();
 
    while(!$this->stopped)
    {
      $lastRunStartTime = microtime(true);
 
      foreach($this->jobQueue as $job)
        $job->runJob($this);
 
      $lastRunDuration = (int)((microtime(true) - $lastRunStartTime) * 1000);
 
      $this->calibrateAndDoLoopSleep($lastRunDuration);
 
      $this->forkOrDestroyIfNeeded();
    }
 
    $this->triggerShoutDown();
  }
 
  protected function triggerInit()
  {
    $this->actualLoopSleep = $this->getConfigValue('maxLoopSleepMs') / 2;
 
    $this->stopped = false;
 
    if($this->getConfigValue('outputLogFile'))
      $this->workerLogFileHandle = fopen($this->getConfigValue('outputLogFile'), 'a');
 
    $this->isMainWorker = $this->getMainLock();
 
    if($this->isMainWorker)
    {
      ignore_user_abort(false);
      $this->initWorkerCounter();
    }
    else
    {
      header("Connection: close");
      $this->runningWorker = $this->countRunningWorker();
      $this->incrementWorkerCounter();
    }
 
    foreach($this->jobQueue as $job)
      $job->onJobInit($this);
  }
 
  protected function triggerShoutdown()
  {
    foreach($this->jobQueue as $job)
      $job->onJobDestroy($this);
 
    if(!$this->isMainWorker)
      $this->decrementWorkerCounter();
 
    if(is_resource($this->workerLogFileHandle))
      fclose($this->workerLogFileHandle);
 
    $this->releaseMainLock();
  }
 
  protected function calibrateAndDoLoopSleep($lastRunDuration)
  {
    if($lastRunDuration > $this->actualLoopSleep)
    {
      $this->sleepCycles = 0;
      ++$this->noSleepCycles;
      $this->actualLoopSleep = max(
        $this->actualLoopSleep / $this->getConfigValue('loopSleepAdjustmentRate'),
        $this->getConfigValue('minLoopSleepMs')
      );
    }
    else
    {
      $this->noSleepCycles = 0;
      ++$this->sleepCycles;
      $this->actualLoopSleep = min(
        $this->actualLoopSleep * $this->getConfigValue('loopSleepAdjustmentRate'),
        $this->getConfigValue('maxLoopSleepMs')
      );
 
      $timeToSleep = (int)($this->actualLoopSleep - $lastRunDuration);
      usleep($timeToSleep);
    }
  }
 
  protected function forkOrDestroyIfNeeded()
  {
    if($this->noSleepCycles > $this->getConfigValue('forkAfterNoSleepCycles')
      && $this->countRunningWorker() < $this->getConfigValue('maxWorker'))
    {
      $this->fork();
    }
    elseif(!$this->isMainWorker
      && (
        $this->sleepCycles > $this->getConfigValue('destroyAfterSleepCycles')
        || $this->getMainLock()
      ))
    {
      $this->releaseMainLock();
      $this->stopped = true;
    }
  }
 
  protected function getMainLock()
  {
    if(!is_resource($this->mainWorkerLockFileHandle))
    {
      $this->mainWorkerLockFileHandle =
        fopen('./M_Worker_' . ucfirst($this->workerName) . '.lock', 'r');
    }
 
    return is_resource($this->mainWorkerLockFileHandle)
      && @flock($this->mainWorkerLockFileHandle, LOCK_EX | LOCK_NB);
  }
 
  protected function fork()
  {
    $url = 'http://' . $_SERVER['HTTP_HOST'] . ':' . $_SERVER['SERVER_PORT'] . $_SERVER['REQUEST_URI'];
    $fp = fopen($url, 'r');
 
    if(is_resource($fp))
      fclose($fp);
  }
 
      [...]
 
      $this->stopped = true;
   }
  }
 
  protected function getMainLock()
  {
 
    if(!is_resource($this->mainWorkerLockFileHandle))
    {
 
      $this->mainWorkerLockFileHandle =
          fopen('./M_Worker_'.ucfirst($this->workerName).'.lock', 'r');
    }
 
   return is_resource($this->mainWorkerLockFileHandle)
        && @flock($this->mainWorkerLockFileHandle, LOCK_EX | LOCK_NB);
 
  }
 
  protected function fork()
  {
    $url = 'http://'.$_SERVER['HTTP_HOST'].':'.$_SERVER['SERVER_PORT'].$_SERVER['REQUEST_URI'];
    $fp = fopen($url, 'r');
      if(is_resource($fp))
        fclose($fp);
  }
 
   [...]
}
Back to latest