Advertisement
Guest User

Untitled

a guest
Mar 26th, 2020
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 1.52 KB | None | 0 0
  1. <?php
  2.  
  3. class Message {
  4.     public function __construct($job) {
  5.         //@todo: set data accordingly
  6.     }
  7.  
  8.     public $timestamp = 123;
  9.     public $data      = [];
  10. }
  11.  
  12. class Subscriber {
  13.     private $run = true;
  14.     private $job_done = false;
  15.  
  16.     public function __construct() {
  17.         declare(ticks = 1);
  18.         pcntl_signal(SIGTERM, function () {
  19.             $this->run = false;
  20.             while (!$this->job_done) {
  21.                 usleep(100000);
  22.             }
  23.         });
  24.     }
  25.  
  26.     public function Subscribe(Callable $callback): void {
  27.         while ($this->run) {
  28.             $jobs = $this->selectJobs();
  29.             foreach ($jobs as $job) {
  30.                 $Message = new Message($job);
  31.                 if ($callback($Message)) {
  32.                     $this->markJobDone($job['id']);
  33.                 } else {
  34.                     $this->markJobFailed($job['id']);
  35.                 }
  36.             }
  37.         }
  38.         $this->job_done = true;
  39.     }
  40.  
  41.     private function selectJobs(): array {
  42.         //@todo: select && lock jobs from musql
  43.         return [];
  44.     }
  45.  
  46.     private function markJobDone(int $job_id) : void {
  47.         //@todo: delete from mysql
  48.     }
  49.  
  50.     private function markJobFailed(int $job_id) : void {
  51.         //@todo: log error, send job TO THE END OF THE QUEUE (otherwise failed jobs may stop queue to being processed)
  52.         //@todo: increase fail_counter. if fail_counter > threshold_value, do not select this job anymore => need manual processing
  53.     }
  54. }
  55.  
  56. class Worker {
  57.     public function run(): void {
  58.         (new Subscriber())->Subscribe([$this, 'processMessage']);
  59.     }
  60.  
  61.     private function processMessage(Message $Message): bool {
  62.         var_dump($Message->data);
  63.         // process message here
  64.         return true;
  65.     }
  66. }
  67.  
  68. (new Worker)->run();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement