Advertisement
Guest User

Untitled

a guest
Mar 26th, 2020
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 1.41 KB | None | 0 0
  1. <?php
  2.  
  3. class Message {
  4.     public function __construct($job) {
  5.         //@todo: set data accordingly
  6.     }
  7.  
  8.     public $timestamp = 123;
  9.     public $data      = [];
  10. }
  11.  
  12. class Subscriber {
  13.     private $run = true;
  14.  
  15.     public function __construct() {
  16.         declare(ticks = 1);
  17.         pcntl_signal(SIGTERM, function () {
  18.             $this->run = false;
  19.         });
  20.     }
  21.  
  22.     public function Subscribe(Callable $callback): void {
  23.         while ($this->run) {
  24.             $jobs = $this->selectJobs();
  25.             foreach ($jobs as $job) {
  26.                 $Message = new Message($job);
  27.                 if ($callback($Message)) {
  28.                     $this->markJobDone($job['id']);
  29.                 } else {
  30.                     $this->markJobFailed($job['id']);
  31.                 }
  32.             }
  33.         }
  34.     }
  35.  
  36.     private function selectJobs(): array {
  37.         //@todo: select && lock jobs from musql
  38.         return [];
  39.     }
  40.  
  41.     private function markJobDone(int $job_id) : void {
  42.         //@todo: delete from mysql
  43.     }
  44.  
  45.     private function markJobFailed(int $job_id) : void {
  46.         //@todo: log error, send job TO THE END OF THE QUEUE (otherwise failed jobs may stop queue to being processed)
  47.         //@todo: increase fail_counter. if fail_counter > threshold_value, do not select this job anymore => need manual processing
  48.     }
  49. }
  50.  
  51. class Worker {
  52.     public function run(): void {
  53.         (new Subscriber())->Subscribe([$this, 'processMessage']);
  54.     }
  55.  
  56.     private function processMessage(Message $Message): bool {
  57.         var_dump($Message->data);
  58.         // process message here
  59.         return true;
  60.     }
  61. }
  62.  
  63. (new Worker)->run();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement