Guest User

Untitled

a guest
Jun 19th, 2018
89
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 3.23 KB | None | 0 0
  1. <?php
  2. require_once("pgq/PGQRemoteConsumer.php");
  3.  
  4. /**
  5.  * PGQEventRemoteConsumer is a PGQRemoteConsumer which handles nested
  6.  * transactions for event management, allowing the remote processing
  7.  * to be commited or rollbacked at event level.
  8.  */
  9.  
  10. abstract class PGQCoopConsumer extends PGQConsumer
  11. {
  12.   protected $sname; // subconsumer name
  13.   protected $timeout = NULL;
  14.  
  15.   public function __construct($sname, $cname, $qname, $argc, $argv, $src_constr)
  16.   {
  17.       $this->sname = $sname;
  18.       parent::__construct($cname, $qname, $argc, $argv, $src_constr);
  19.   }
  20.  
  21.   protected function register() {
  22.     $sql = sprintf("SELECT pgq_coop.register_subconsumer('%s', '%s', '%s');",
  23.                    pg_escape_string($this->qname),
  24.                    pg_escape_string($this->cname),
  25.                    pg_escape_string($this->sname));
  26.  
  27.     $this->log->verbose("%s", $sql);
  28.     $r = pg_query($this->pg_src_con, $sql);
  29.     if( $r === False ) {
  30.         $this->log->warning(
  31.             "Could not register subconsumer '%s' of '%s' to queue '%s'",
  32.             $this->sname, $this->cname, $this->qname);
  33.       return False;
  34.     }
  35.  
  36.     $registered = pg_fetch_result($r, 0, 0);
  37.     if( $registered == "1" ) {
  38.       return True;
  39.     }
  40.     else {
  41.       $this->log->fatal("Register SubConsumer failed (%d).", $registered);
  42.       return False;
  43.     }
  44.   }
  45.  
  46.   /**
  47.    * Unregister PGQ Consumer. Called from stop().
  48.    */
  49.   public function unregister() {
  50.     $sql = sprintf("SELECT pgq_coop.unregister_subconsumer('%s', '%s', '%s', 0);",
  51.                    pg_escape_string($qname),
  52.                    pg_escape_string($cname),
  53.                    pg_escape_string($sname));
  54.  
  55.     $this->log->verbose("%s", $sql);
  56.     $r = pg_query($this->pg_src_con, $sql);
  57.     if( $r === False ) {
  58.       $this->log->fatal("Could not unregister subconsumer '%s' of '%s' to queue '%s'",
  59.                   $this->sname, $this->cname, $this->qname);
  60.       return False;
  61.     }
  62.  
  63.     $unregistered = pg_fetch_result($r, 0, 0);
  64.     if( $unregistered == "1" ) {
  65.       return True;
  66.     }
  67.     else {
  68.       $this->log->fatal("Unregister SubConsumer failed (%d).", $unregistered);
  69.       return False;
  70.     }
  71.   }
  72.  
  73.   protected function next_batch() {
  74.       if( $this->timeout !== NULL )
  75.           $sql = sprintf("SELECT pgq_coop.next_batch('%s', '%s', '%s', '%s')",
  76.                          pg_escape_string($this->qname),
  77.                          pg_escape_string($this->cname),
  78.                          pg_escape_string($this->sname),
  79.                          $this->timeout);
  80.       else
  81.           $sql = sprintf("SELECT pgq_coop.next_batch('%s', '%s', '%s')",
  82.                          pg_escape_string($this->qname),
  83.                          pg_escape_string($this->cname),
  84.                          pg_escape_string($this->sname));
  85.  
  86.     $this->log->verbose("%s", $sql);
  87.     if( ($r = pg_query($this->pg_src_con, $sql)) === False ) {
  88.         $this->log->error("Could not get next batch");
  89.         return False;
  90.     }
  91.  
  92.     $batch_id = pg_fetch_result($r, 0, 0);
  93.     $this->log->debug("Get batch_id %s (isnull=%s)",
  94.                       $batch_id,
  95.                       ($batch_id === null ? "True" : "False"));
  96.     return $batch_id;
  97.   }
  98.  
  99.   protected function finish_batch($batch_id) {
  100.     $sql = sprintf("SELECT pgq_coop.finish_batch(%d);", (int)$batch_id);
  101.  
  102.     $this->log->verbose("%s", $sql);
  103.     if( pg_query($this->pg_src_con, $sql) === False ) {
  104.         $this->log->error("Could not finish batch %d", (int)$batch_id);
  105.         return False;
  106.     }
  107.     return True;
  108.   }
  109. }
Add Comment
Please, Sign In to add comment