Advertisement
Guest User

MultiAccess

a guest
Sep 22nd, 2010
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 7.24 KB | None | 0 0
  1. <?php
  2.  
  3. class MultiAccess
  4. {
  5.     public $read_q_key = 10081;
  6.     public $write_q_key = 10082;
  7.     public $writers_mutex_key = 10083;
  8.     public $err_log = array();
  9.     public $log = array();
  10.     private $read_q;
  11.     private $write_q;
  12.     private $writers_mutex;
  13.     private $writers_count = 0;
  14.     private $readers_count = 0;
  15.     const readers = 1;
  16.     const writers = 2;
  17.     const wait_time = 1;
  18.     const max_readers_queue = 100;
  19.     const max_writers_queue = 100;
  20.  
  21.     public function get_access_read(&$auto_unlocker_reference=-1)
  22.     {
  23.         if (($this->writers_count <= 0) && ($this->readers_count <= 0)) //if it is nested call - access should be given only once
  24.         {
  25.             $writers = $this->get_counter(self::writers); //if somebody are writing here now - we will wait
  26.             if ($writers > 0)
  27.             {
  28.                 $readers = $this->get_counter(self::readers);
  29.                 if ($readers < self::max_readers_queue) $this->wait(self::writers); //if some writer become freezed - don't wait too long
  30.             }
  31.             $sent = $this->increment(); //increment count of readers - writers will be waiting for us while we read.
  32.             if ($sent === false)
  33.             {
  34.                 $this->err_log[] = "gar: not incremented";
  35.                 return false;
  36.             }
  37.         }
  38.  
  39.         if ($auto_unlocker_reference!=-1)
  40.         {
  41.             $mutex = new autoUnlocker(array($this,'release_access_read'));
  42.             $auto_unlocker_reference=$mutex;
  43.         }
  44.         $this->readers_count++;
  45.         return true;
  46.     }
  47.  
  48.     public function release_access_read($autoUnlocker=NULL)
  49.     {
  50.         if (!empty($autoUnlocker)) $autoUnlocker->revoke();
  51.         if ($this->readers_count > 0) $this->readers_count--;
  52.         if ($this->readers_count == 0) $this->decrement(self::readers); //tell to writers, that we have read and they can write now
  53.         return true;
  54.     }
  55.  
  56.     public function get_access_write(&$auto_unlocker_reference=-1)
  57.     {
  58.         if ($this->writers_count <= 0) //if we are writers already - don't need to lock semaphore again
  59.         {
  60.             if ($this->readers_count > 0) //if we got reader's access and want to write - release our reader's access and got new access - writer's (else process will wait itself for a while)
  61.             {
  62.                 while ($this->readers_count > 0) $this->release_access_read($auto_unlocker_reference);
  63.             }
  64.             //acquire mutex for writing
  65.             if (empty($this->writers_mutex)) $this->writers_mutex = sem_get($this->writers_mutex_key,1,0777,1);
  66.             if (empty($this->writers_mutex)) return false;
  67.             ignore_user_abort(true); //don't hang with semaphore, please :)
  68.             set_time_limit(3);
  69.             $starttime = microtime(true);
  70.             if (!sem_acquire($this->writers_mutex)) return false;
  71.             $acquire_time = microtime(true) - $starttime;
  72.             if ($acquire_time >= 1) $this->err_log[] = 'Acquire time: ' . round($acquire_time,4);
  73.             //tell to readers, that they should wait while we will write
  74.             $sent = $this->increment(self::writers);
  75.             if ($sent === false)
  76.             {
  77.                 $this->err_log[] = "gaw: not incremented";
  78.                 return false;
  79.             }
  80.             //but if readers has come before us - wait, until they finish
  81.             $readers = $this->get_counter(self::readers);
  82.             if ($readers > 0)
  83.             {
  84.                 $writers = $this->get_counter(self::writers);
  85.                 if ($writers < self::max_writers_queue) $this->wait(self::readers); //if some reader become freezed - don't wait too long
  86.             }
  87.         }
  88.  
  89.         if ($auto_unlocker_reference!=-1)
  90.         {
  91.             $mutex = new autoUnlocker(array($this,'release_access_write'));
  92.             $auto_unlocker_reference=$mutex;
  93.         }
  94.         $this->writers_count++;
  95.         //and now we can write :)
  96.         return true;
  97.     }
  98.  
  99.     public function release_access_write($autoUnlocker=NULL)
  100.     {
  101.         if (!empty($autoUnlocker)) $autoUnlocker->revoke();
  102.         if ($this->writers_count > 0) $this->writers_count--;
  103.         if ($this->writers_count == 0)
  104.         {
  105.             $this->decrement(self::writers); //tell to readers, that they can read now
  106.             if (empty($this->writers_mutex)) $this->writers_mutex = sem_get($this->writers_mutex_key,1,0777,1);
  107.             @sem_release($this->writers_mutex);
  108.         }
  109.         return true;
  110.     }
  111.  
  112.     private function increment($type = self::readers)
  113.     {
  114.         $q = $this->select_q($type);
  115.         if (empty($q)) return false;
  116.         $sent = msg_send($q,$type,1,false,false,$err);
  117.         if ($sent == false) return false;
  118.         return true;
  119.     }
  120.  
  121.     private function decrement($type = self::readers)
  122.     {
  123.         $q = $this->select_q($type);
  124.         if (empty($q)) return false;
  125.         $recieve = msg_receive($q,$type,$t,100,$msg,false,MSG_IPC_NOWAIT + MSG_NOERROR,$err);
  126.         if ($recieve === false) return false;
  127.         return true;
  128.     }
  129.  
  130.     public function get_counter($type = self::writers)
  131.     {
  132.         $q = $this->select_q($type);
  133.         $stat = msg_stat_queue($q);
  134.         return $stat['msg_qnum'];
  135.     }
  136.  
  137.     private function wait($type = self::writers)
  138.     {
  139.         $q = $this->select_q($type);
  140.         if (empty($q)) return false;
  141.  
  142.         $starttime = time();
  143.         do
  144.         {
  145.             $stat = msg_stat_queue($q);
  146.             if ((time() - $starttime) > self::wait_time)
  147.             {
  148.                 $this->err_log[] = 'waiting for ' . $type . "\n" . $this->printable_backtrace();
  149.                 for ($i = 0;$i < $stat['msg_qnum'];$i++) $this->decrement($type);
  150.                 break;
  151.             }
  152.         }
  153.         while ($stat['msg_qnum'] > 0);
  154.         return true;
  155.     }
  156.  
  157.     private function select_q($type)
  158.     {
  159.         if ($type == self::readers)
  160.         {
  161.             if (empty($this->read_q)) $this->read_q = msg_get_queue($this->read_q_key,0777);
  162.             $q = $this->read_q;
  163.         }
  164.         else
  165.         {
  166.             if (empty($this->write_q)) $this->write_q = msg_get_queue($this->write_q_key,0777);
  167.             $q = $this->write_q;
  168.         }
  169.         return $q;
  170.     }
  171.  
  172.     public function __construct($id = '')
  173.     {
  174.         if (!empty($id))
  175.         {
  176.             if (is_string($id))
  177.             {
  178.                 if (is_file($id) || is_dir($id)) $id = ftok($id,'I');
  179.                 else  $id = intval($id);
  180.             }
  181.             $this->read_q_key += $id;
  182.             $this->write_q_key += $id;
  183.             $this->writers_mutex_key += $id;
  184.         }
  185.     }
  186.  
  187.     public function __destruct()
  188.     {
  189.         if ($this->writers_count > 0)
  190.         {
  191.             $this->err_log[]='writers count = '.$this->writers_count;
  192.             while ($this->writers_count > 0) $this->release_access_write();
  193.         }
  194.         elseif (!empty($this->writers_mutex)) @sem_release($this->writers_mutex);
  195.         if ($this->readers_count > 0)
  196.         {
  197.             $this->err_log[]='readers count = '.$this->readers_count;
  198.             while ($this->readers_count > 0) $this->release_access_read();
  199.         }
  200.  
  201.     }
  202.  
  203.     private static function printable_backtrace()
  204.     {
  205.         $tmp = debug_backtrace();
  206.         array_shift($tmp);
  207.         $arr = array_reverse($tmp);
  208.         $str = '';
  209.         $space = $basespace = '|';
  210.         foreach ($arr as $t)
  211.         {
  212.             $str .= ' ' . $space . $t['file'] . "\t[" . $t['line'] . "]\t";
  213.             if (isset($t['class'])) $str .= $t['class'] . $t['type'];
  214.             $str .= $t['function'];
  215.             if (isset($t['args'][0]))
  216.             {
  217.                 $str .= '("';
  218.                 foreach ($t['args'] as $t_arg)
  219.                 {
  220.                     if (!is_scalar($t_arg)) $str .= '[scalar],';
  221.                     else  $str .= $t_arg . ',';
  222.                 }
  223.                 $str .= '")';
  224.             }
  225.             else  $str .= '()';
  226.             $str .= "\n";
  227.             $space .= $basespace;
  228.         }
  229.         return rtrim($str);
  230.     }
  231. }
  232.  
  233. /**
  234.  * thanks to authors of RAII!
  235.  */
  236. class autoUnlocker
  237. {
  238.     private $Unlock;
  239.  
  240.     /**
  241.      * Register function to auto-unlock mutex
  242.      * @param callback $Unlock
  243.      * @return void
  244.      */
  245.     public function __construct($Unlock)
  246.     {
  247.         if (is_callable($Unlock)) $this->Unlock=$Unlock;
  248.     }
  249.  
  250.     /**
  251.      * Revoke auto-unlocking (if mutex unlocked was "manually")
  252.      */
  253.     public function revoke()
  254.     {
  255.         unset($this->Unlock);
  256.     }
  257.  
  258.     /**
  259.      * Unlock mutex automatically
  260.      */
  261.     public function __destruct()
  262.     {
  263.         if (isset($this->Unlock)) call_user_func($this->Unlock,$this);
  264.     }
  265.  
  266. }
  267.  
  268. ?>
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement