SHARE
TWEET

Untitled

a guest Mar 26th, 2020 192 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. <?php
  2.  
  3. class Message {
  4.     public function __construct($job) {
  5.         //@todo: set data accordingly
  6.     }
  7.  
  8.     public $timestamp = 123;
  9.     public $data      = [];
  10. }
  11.  
  12. class Subscriber {
  13.     private $run = true;
  14.     private $job_done = false;
  15.  
  16.     public function __construct() {
  17.         declare(ticks = 1);
  18.         pcntl_signal(SIGTERM, function () {
  19.             $this->run = false;
  20.             while (!$this->job_done) {
  21.                 usleep(100000);
  22.             }
  23.         });
  24.     }
  25.  
  26.     public function Subscribe(Callable $callback): void {
  27.         while ($this->run) {
  28.             $jobs = $this->selectJobs();
  29.             foreach ($jobs as $job) {
  30.                 $Message = new Message($job);
  31.                 if ($callback($Message)) {
  32.                     $this->markJobDone($job['id']);
  33.                 } else {
  34.                     $this->markJobFailed($job['id']);
  35.                 }
  36.                 if (!$this->run) break;
  37.             }
  38.         }
  39.         $this->job_done = true;
  40.     }
  41.  
  42.     private function selectJobs(): array {
  43.         //@todo: select && lock jobs from musql
  44.         return [];
  45.     }
  46.  
  47.     private function markJobDone(int $job_id) : void {
  48.         //@todo: delete from mysql
  49.     }
  50.  
  51.     private function markJobFailed(int $job_id) : void {
  52.         //@todo: log error, send job TO THE END OF THE QUEUE (otherwise failed jobs may stop queue to being processed)
  53.         //@todo: increase fail_counter. if fail_counter > threshold_value, do not select this job anymore => need manual processing
  54.     }
  55. }
  56.  
  57. class Worker {
  58.     public function run(): void {
  59.         (new Subscriber())->Subscribe([$this, 'processMessage']);
  60.     }
  61.  
  62.     private function processMessage(Message $Message): bool {
  63.         var_dump($Message->data);
  64.         // process message here
  65.         return true;
  66.     }
  67. }
  68.  
  69. (new Worker)->run();
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
Top