Advertisement
Guest User

Untitled

a guest
Mar 26th, 2020
84
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 1.25 KB | None | 0 0
  1. <?php
  2.  
  3. class Message {
  4.     public function __construct($job) {
  5.         //@todo: set data accordingly
  6.     }
  7.  
  8.     public $timestamp = 123;
  9.     public $data      = [];
  10. }
  11.  
  12. class Subscriber {
  13.     public function Subscribe(Callable $callback): void {
  14.         while (true) {
  15.             $jobs = $this->selectJobs();
  16.             foreach ($jobs as $job) {
  17.                 $Message = new Message($job);
  18.                 if ($callback($Message)) {
  19.                     $this->markJobDone($job['id']);
  20.                 } else {
  21.                     $this->markJobFailed($job['id']);
  22.                 }
  23.             }
  24.         }
  25.     }
  26.  
  27.     private function selectJobs(): array {
  28.         //@todo: select && lock jobs from musql
  29.         return [];
  30.     }
  31.  
  32.     private function markJobDone(int $job_id) : void {
  33.         //@todo: delete from mysql
  34.     }
  35.  
  36.     private function markJobFailed(int $job_id) : void {
  37.         //@todo: log error, send job TO THE END OF THE QUEUE (otherwise failed jobs may stop queue to being processed)
  38.         //@todo: increase fail_counter. if fail_counter > threshold_value, do not select this job anymore => need manual processing
  39.     }
  40. }
  41.  
  42. class Worker {
  43.     public function run(): void {
  44.         (new Subscriber())->Subscribe([$this, 'processMessage']);
  45.     }
  46.  
  47.     private function processMessage(Message $Message): bool {
  48.         var_dump($Message->data);
  49.         // process message here
  50.         return true;
  51.     }
  52. }
  53.  
  54. (new Worker)->run();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement