Advertisement
Guest User

Untitled

a guest
Mar 26th, 2020
69
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 1.44 KB | None | 0 0
  1. <?php
  2.  
  3. class Message {
  4.     public function __construct($job) {
  5.         //@todo: set data accordingly
  6.     }
  7.  
  8.     public $timestamp = 123;
  9.     public $data      = [];
  10. }
  11.  
  12. class Subscriber {
  13.     private $run = true;
  14.  
  15.     public function __construct() {
  16.         declare(ticks = 1);
  17.         pcntl_signal(SIGTERM, function () {
  18.             $this->run = false;
  19.         });
  20.     }
  21.  
  22.     public function Subscribe(Callable $callback): void {
  23.         while ($this->run) {
  24.             $jobs = $this->selectJobs();
  25.             foreach ($jobs as $job) {
  26.                 $Message = new Message($job);
  27.                 if ($callback($Message)) {
  28.                     $this->markJobDone($job['id']);
  29.                 } else {
  30.                     $this->markJobFailed($job['id']);
  31.                 }
  32.                 if (!$this->run) break;
  33.             }
  34.         }
  35.     }
  36.  
  37.     private function selectJobs(): array {
  38.         //@todo: select && lock jobs from musql
  39.         return [];
  40.     }
  41.  
  42.     private function markJobDone(int $job_id) : void {
  43.         //@todo: delete from mysql
  44.     }
  45.  
  46.     private function markJobFailed(int $job_id) : void {
  47.         //@todo: log error, send job TO THE END OF THE QUEUE (otherwise failed jobs may stop queue to being processed)
  48.         //@todo: increase fail_counter. if fail_counter > threshold_value, do not select this job anymore => need manual processing
  49.     }
  50. }
  51.  
  52. class Worker {
  53.     public function run(): void {
  54.         (new Subscriber())->Subscribe([$this, 'processMessage']);
  55.     }
  56.  
  57.     private function processMessage(Message $Message): bool {
  58.         var_dump($Message->data);
  59.         // process message here
  60.         return true;
  61.     }
  62. }
  63.  
  64. (new Worker)->run();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement