Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php
- class Message {
- public function __construct($job) {
- //@todo: set data accordingly
- }
- public $timestamp = 123;
- public $data = [];
- }
- class Subscriber {
- private $run = true;
- private $job_done = false;
- public function __construct() {
- declare(ticks = 1);
- pcntl_signal(SIGTERM, function () {
- $this->run = false;
- while (!$this->job_done) {
- usleep(100000);
- }
- });
- }
- public function Subscribe(Callable $callback): void {
- while ($this->run) {
- $jobs = $this->selectJobs();
- foreach ($jobs as $job) {
- $Message = new Message($job);
- if ($callback($Message)) {
- $this->markJobDone($job['id']);
- } else {
- $this->markJobFailed($job['id']);
- }
- if (!$this->run) break;
- }
- }
- $this->job_done = true;
- }
- private function selectJobs(): array {
- //@todo: select && lock jobs from musql
- return [];
- }
- private function markJobDone(int $job_id) : void {
- //@todo: delete from mysql
- }
- private function markJobFailed(int $job_id) : void {
- //@todo: log error, send job TO THE END OF THE QUEUE (otherwise failed jobs may stop queue to being processed)
- //@todo: increase fail_counter. if fail_counter > threshold_value, do not select this job anymore => need manual processing
- }
- }
- class Worker {
- public function run(): void {
- (new Subscriber())->Subscribe([$this, 'processMessage']);
- }
- private function processMessage(Message $Message): bool {
- var_dump($Message->data);
- // process message here
- return true;
- }
- }
- (new Worker)->run();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement