Advertisement
Guest User

ZendX_Console_Process_Unix

a guest
Mar 10th, 2010
2,778
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 20.06 KB | None | 0 0
  1. <?php
  2. /**
  3.  * Zend Framework
  4.  *
  5.  * LICENSE
  6.  *
  7.  * This source file is subject to the new BSD license that is bundled
  8.  * with this package in the file LICENSE.txt.
  9.  * It is also available through the world-wide-web at this URL:
  10.  * http://framework.zend.com/license/new-bsd
  11.  * If you did not receive a copy of the license and are unable to
  12.  * obtain it through the world-wide-web, please send an email
  13.  * to license@zend.com so we can send you a copy immediately.
  14.  *
  15.  * @category  ZendX
  16.  * @package   ZendX_Console
  17.  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
  18.  * @license   http://framework.zend.com/license/new-bsd     New BSD License
  19.  * @version   $Id: Unix.php 20165 2010-01-09 18:57:56Z bkarwin $
  20.  */
  21.  
  22.  
  23. /**
  24.  * ZendX_Console_Process_Unix allows you to spawn a class as a separated process
  25.  *
  26.  * @category  ZendX
  27.  * @package   ZendX_Console
  28.  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
  29.  * @license   http://framework.zend.com/license/new-bsd     New BSD License
  30.  */
  31. abstract class ZendX_Console_Process_Unix
  32. {
  33.     /**
  34.      * Void method
  35.      */
  36.     const VOID_METHOD = 'void_method';
  37.  
  38.     /**
  39.      * Return method
  40.      */
  41.     const RETURN_METHOD = 'void_method';
  42.    
  43.     /**
  44.      * Unique thread name
  45.      *
  46.      * @var string
  47.      */
  48.     private $_name;
  49.  
  50.     /**
  51.      * PID of the child process
  52.      *
  53.      * @var integer
  54.      */
  55.     private $_pid = null;
  56.  
  57.     /**
  58.      * UID of the child process owner
  59.      *
  60.      * @var integer
  61.      */
  62.     private $_puid = null;
  63.  
  64.     /**
  65.      * GUID of the child process owner
  66.      *
  67.      * @var integer
  68.      */
  69.     private $_guid = null;
  70.  
  71.     /**
  72.      * Whether the process is yet forked or not
  73.      *
  74.      * @var boolean
  75.      */
  76.     private $_isRunning = false;
  77.  
  78.     /**
  79.      * Wether we are into child process or not
  80.      *
  81.      * @var boolean
  82.      */
  83.     private $_isChild = false;
  84.  
  85.     /**
  86.      * A data structure to hold data for Inter Process Communications
  87.      *
  88.      * @var array
  89.      */
  90.     private $_internalIpcData = array();
  91.  
  92.     /**
  93.      * Key to access to Shared Memory Area.
  94.      *
  95.      * @var integer
  96.      */
  97.     private $_internalIpcKey;
  98.  
  99.     /**
  100.      * Key to access to Sync Semaphore.
  101.      *
  102.      * @var integer
  103.      */
  104.     private $_internalSemKey;
  105.  
  106.     /**
  107.      * Is Shared Memory Area OK? If not, the start() method will block.
  108.      * Otherwise we'll have a running child without any communication channel.
  109.      *
  110.      * @var boolean
  111.      */
  112.     private $_ipcIsOkay;
  113.  
  114.     /**
  115.      * Filename of the IPC segment file
  116.      *
  117.      * @var string
  118.      */
  119.     private $_ipcSegFile;
  120.  
  121.     /**
  122.      * Filename of the semaphor file
  123.      *
  124.      * @var string
  125.      */
  126.     private $_ipcSemFile;
  127.  
  128.     /**
  129.      * Constructor method
  130.      *
  131.      * Allocates a new pseudo-thread object. Optionally, set a PUID, a GUID and
  132.      * a UMASK for the child process. This also initialize Shared Memory
  133.      * Segments for process communications.
  134.      *
  135.      * @param  integer $puid
  136.      * @param  integer $guid
  137.      * @param  integer $umask
  138.      * @throws ZendX_Console_Process_Exception When running on windows
  139.      * @throws ZendX_Console_Process_Exception When running in web enviroment
  140.      * @throws ZendX_Console_Process_Exception When shmop_* functions don't exist
  141.      * @throws ZendX_Console_Process_Exception When pcntl_* functions don't exist
  142.      * @throws ZendX_Console_Process_Exception When posix_* functions don't exist
  143.      */
  144.     public function __construct($puid = null, $guid = null, $umask = null)
  145.     {
  146.         if (substr(PHP_OS, 0, 3) === 'WIN') {
  147.             require_once 'ZendX/Console/Process/Exception.php';
  148.             throw new ZendX_Console_Process_Exception('Cannot run on windows');
  149.         } else if (!in_array(substr(PHP_SAPI, 0, 3), array('cli', 'cgi'))) {
  150.             require_once 'ZendX/Console/Process/Exception.php';
  151.             throw new ZendX_Console_Process_Exception('Can only run on CLI or CGI enviroment');
  152.         } else if (!function_exists('shmop_open')) {
  153.             require_once 'ZendX/Console/Process/Exception.php';
  154.             throw new ZendX_Console_Process_Exception('shmop_* functions are required');
  155.         } else if (!function_exists('pcntl_fork')) {
  156.             require_once 'ZendX/Console/Process/Exception.php';
  157.             throw new ZendX_Console_Process_Exception('pcntl_* functions are required');
  158.         } else if (!function_exists('posix_kill')) {
  159.             require_once 'ZendX/Console/Process/Exception.php';
  160.             throw new ZendX_Console_Process_Exception('posix_* functions are required');
  161.         }
  162.    
  163.         $this->_isRunning = false;
  164.  
  165.         $this->_name = md5(uniqid(rand()));
  166.         $this->_guid = $guid;
  167.         $this->_puid = $puid;
  168.  
  169.         if ($umask !== null) {
  170.             umask($umask);
  171.         }
  172.  
  173.         // Try to create the shared memory segment. The variable
  174.         // $this->_ipcIsOkay contains the return code of this operation and must
  175.         // be checked before forking
  176.         if ($this->_createIpcSegment() && $this->_createIpcSemaphore()) {
  177.             $this->_ipcIsOkay = true;
  178.         } else {
  179.             $this->_ipcIsOkay = false;
  180.         }
  181.     }
  182.    
  183.     /**
  184.      * Stop the child on destruction
  185.      */
  186.     public function __destruct()
  187.     {
  188.         if ($this->isRunning()) {
  189.             $this->stop();
  190.         }
  191.     }
  192.    
  193.     /**
  194.      * Causes this pseudo-thread to begin parallel execution.
  195.      *
  196.      * This method first checks of all the Shared Memory Segment. If okay, it
  197.      * forks the child process, attaches signal handler and returns immediatly.
  198.      * The status is set to running, and a PID is assigned. The result is that
  199.      * two pseudo-threads are running concurrently: the current thread (which
  200.      * returns from the call to the start() method) and the other thread (which
  201.      * executes its run() method).
  202.      *
  203.      * @throws ZendX_Console_Process_Exception When SHM segments can't be created
  204.      * @throws ZendX_Console_Process_Exception When process forking fails
  205.      * @return void
  206.      */
  207.     public function start()
  208.     {
  209.         if (!$this->_ipcIsOkay) {
  210.             require_once 'ZendX/Console/Process/Exception.php';
  211.             throw new ZendX_Console_Process_Exception('Unable to create SHM segments for process communications');
  212.         }
  213.  
  214.         // @see http://www.php.net/manual/en/function.pcntl-fork.php#41150
  215.         @ob_end_flush();
  216.        
  217.         pcntl_signal(SIGCHLD, SIG_IGN);
  218.  
  219.         $pid = @pcntl_fork();
  220.         if ($pid === -1) {
  221.             require_once 'ZendX/Console/Process/Exception.php';
  222.             throw new ZendX_Console_Process_Exception('Forking process failed');
  223.         } else if ($pid === 0) {
  224.             // This is the child
  225.             $this->_isChild = true;
  226.            
  227.             // Sleep a second to avoid problems
  228.             sleep(1);
  229.            
  230.             // Install the signal handler
  231.             pcntl_signal(SIGUSR1, array($this, '_sigHandler'));
  232.  
  233.             // If requested, change process identity
  234.             if ($this->_guid !== null) {
  235.                 posix_setgid($this->_guid);
  236.             }
  237.  
  238.             if ($this->_puid !== null) {
  239.                 posix_setuid($this->_puid);
  240.             }
  241.  
  242.             // Run the child
  243.             try {
  244.                 $this->_run();
  245.             } catch (Exception $e) {
  246.                 // We have to catch any exceptions and clean up the process,
  247.                 // else we will have a memory leak.
  248.             }
  249.  
  250.             // Destroy the child after _run() execution. Required to avoid
  251.             // unuseful child processes after execution
  252.             exit(0);
  253.         } else {
  254.             // Else this is the parent
  255.             $this->_isChild   = false;
  256.             $this->_isRunning = true;
  257.             $this->_pid       = $pid;
  258.         }
  259.     }
  260.    
  261.     /**
  262.      * Causes the current thread to die.
  263.      *
  264.      * The relative process is killed and disappears immediately from the
  265.      * processes list.
  266.      *
  267.      * @return boolean
  268.      */
  269.     public function stop()
  270.     {
  271.         $success = false;
  272.  
  273.         if ($this->_pid > 0) {
  274.             $status = 0;
  275.            
  276.             posix_kill($this->_pid, 9);
  277.             pcntl_waitpid($this->_pid, $status, WNOHANG);
  278.             $success = pcntl_wifexited($status);
  279.             $this->_cleanProcessContext();
  280.         }
  281.  
  282.         return $success;
  283.     }
  284.  
  285.     /**
  286.      * Test if the pseudo-thread is already started.
  287.      *
  288.      * @return boolean
  289.      */
  290.     public function isRunning()
  291.     {      
  292.         return $this->_isRunning;
  293.     }
  294.  
  295.     /**
  296.      * Set a variable into the shared memory segment, so that it can accessed
  297.      * both from the parent and from the child process. Variable names
  298.      * beginning with underlines are only permitted to interal functions.
  299.      *
  300.      * @param  string $name
  301.      * @param  mixed  $value
  302.      * @throws ZendX_Console_Process_Exception When an invalid variable name is supplied
  303.      * @return void
  304.      */
  305.     public function setVariable($name, $value)
  306.     {
  307.         if ($name[0] === '_') {
  308.             require_once 'ZendX/Console/Process/Exception.php';
  309.             throw new ZendX_Console_Process_Exception('Only internal functions may use underline (_) as variable prefix');
  310.         }
  311.  
  312.         $this->_writeVariable($name, $value);
  313.     }
  314.  
  315.     /**
  316.      * Get a variable from the shared memory segment. Returns NULL if the
  317.      * variable doesn't exist.
  318.      *
  319.      * @param  string $name
  320.      * @return mixed
  321.      */
  322.     public function getVariable($name)
  323.     {
  324.         $this->_readFromIpcSegment();
  325.  
  326.         if (isset($this->_internalIpcData[$name])) {
  327.             return $this->_internalIpcData[$name];
  328.         } else {
  329.             return null;
  330.         }
  331.     }
  332.  
  333.     /**
  334.      * Read the time elapsed since the last child setAlive() call.
  335.      *
  336.      * This method is useful because often we have a pseudo-thread pool and we
  337.      * need to know each pseudo-thread status. If the child executes the
  338.      * setAlive() method, the parent with getLastAlive() can know that child is
  339.      * alive.
  340.      *
  341.      * @return integer
  342.      */
  343.     public function getLastAlive()
  344.     {
  345.         $pingTime = $this->getVariable('_pingTime');
  346.  
  347.         return ($pingTime === null ? 0 : (time() - $pingTime));
  348.     }
  349.  
  350.     /**
  351.      * Returns the PID of the current pseudo-thread.
  352.      *
  353.      * @return integer
  354.      */
  355.     public function getPid()
  356.     {
  357.         return $this->_pid;
  358.     }
  359.    
  360.     /**
  361.      * Set a pseudo-thread property that can be read from parent process
  362.      * in order to know the child activity.
  363.      *
  364.      * Practical usage requires that child process calls this method at regular
  365.      * time intervals; parent will use the getLastAlive() method to know
  366.      * the elapsed time since the last pseudo-thread life signals...
  367.      *
  368.      * @return void
  369.      */
  370.     protected function _setAlive()
  371.     {
  372.         $this->_writeVariable('_pingTime', time());
  373.     }
  374.    
  375.  
  376.     /**
  377.      * This is called from within the parent; all the communication stuff
  378.      * is done here.
  379.      *
  380.      * @param  string $methodName
  381.      * @param  array  $argList
  382.      * @param  string $type
  383.      * @return mixed
  384.      */
  385.     protected function _callCallbackMethod($methodName, array $argList = array(), $type = self::VOID_METHOD)
  386.     {
  387.         // This is the parent, so we really cannot execute the method. Check
  388.         // arguments passed to the method.
  389.         if ($type === self::RETURN_METHOD) {
  390.             $this->_internalIpcData['_callType'] = self::RETURN_METHOD;
  391.         } else {
  392.             $this->_internalIpcData['_callType'] = self::VOID_METHOD;
  393.         }
  394.  
  395.         // These setting are common to both the calling types
  396.         $this->_internalIpcData['_callMethod'] = $methodName;
  397.         $this->_internalIpcData['_callInput']  = $argList;
  398.  
  399.         // Write the IPC data to the shared segment
  400.         $this->_writeToIpcSegment();
  401.  
  402.         // Now we need to differentiate a bit.
  403.         switch ($this->_internalIpcData['_callType']) {
  404.             case VOID_METHOD:
  405.                 // Notify the child so it can process the request
  406.                 $this->_sendSigUsr1();
  407.                 break;
  408.  
  409.             case RETURN_METHOD:
  410.                 // Set the semaphorew
  411.                 shmop_write($this->_internalSemKey, 1, 0);
  412.  
  413.                 // Notify the child so it can process the request
  414.                 $this->_sendSigUsr1();
  415.  
  416.                 // Block until the child process return
  417.                 $this->_waitForIpcSemaphore();
  418.  
  419.                 // Read from the SHM segment. The result is stored into
  420.                 // $this->_internalIpcData['_callOutput']
  421.                 $this->_readFromIpcSegment();
  422.  
  423.                 // Data are returned. Now we can reset the semaphore
  424.                 shmop_write($this->_internalSemKey, 0, 1);
  425.  
  426.                 // Return the result. Hence no break required here
  427.                 return $this->_internalIpcData['_callOutput'];
  428.         }
  429.     }
  430.    
  431.     /**
  432.      * This method actually implements the pseudo-thread logic.
  433.      *
  434.      * @return void
  435.      */
  436.     abstract protected function _run();
  437.    
  438.     /**
  439.      * Sends signal to the child process
  440.      *
  441.      * @return void
  442.      */
  443.     private function _sendSigUsr1()
  444.     {
  445.         if ($this->_pid > 0) {
  446.             posix_kill($this->_pid, SIGUSR1);
  447.         }
  448.     }
  449.    
  450.     /**
  451.      * Acutally Write a variable to the shared memory segment
  452.      *
  453.      * @param  string $name
  454.      * @param  mixed  $value
  455.      * @return void
  456.      */
  457.     private function _writeVariable($name, $value)
  458.     {
  459.         $this->_internalIpcData[$name] = $value;
  460.         $this->_writeToIpcSegment();
  461.     }
  462.  
  463.     /**
  464.      * Destroy thread context and free relative resources.
  465.      *
  466.      * @return void
  467.      */
  468.     private function _cleanProcessContext()
  469.     {
  470.         shmop_delete($this->_internalIpcKey);
  471.         shmop_delete($this->_internalSemKey);
  472.  
  473.         shmop_close($this->_internalIpcKey);
  474.         shmop_close($this->_internalSemKey);
  475.  
  476.         @unlink($this->_ipcSegFile);
  477.         @unlink($this->_ipcSemFile);
  478.  
  479.         $this->_isRunning = false;
  480.         $this->_pid       = null;
  481.     }
  482.  
  483.     /**
  484.      * This is the signal handler that makes the communications between client
  485.      * and server possible.
  486.      *
  487.      * @param  integer $signo
  488.      * @return void
  489.      */
  490.     private function _sigHandler($signo)
  491.     {
  492.         switch ($signo) {
  493.             case SIGTERM:
  494.                 // Handle shutdown tasks. Hence no break is require
  495.                 exit;
  496.  
  497.             case SIGUSR1:
  498.                 // This is the User-defined signal we'll use. Read the SHM segment
  499.                 $this->_readFromIpcSegment();
  500.  
  501.                 if (isset($this->_internalIpcData['_callType'])) {
  502.                     $method = $this->_internalIpcData['_callMethod'];
  503.                     $params = $this->_internalIpcData['_callInput'];
  504.  
  505.                     switch ($this->_internalIpcData['_callType']) {
  506.                         case self::VOID_METHOD:
  507.                             // Simple call the (void) method and return immediatly
  508.                             // no semaphore is placed into parent, so the processing
  509.                             // is async
  510.                             call_user_func(array($this, $method), $params);
  511.                             break;
  512.  
  513.                         case self::RETURN_METHOD:
  514.                             // Process the request
  515.                             $this->_internalIpcData['_callOutput'] = call_user_func(array($this, $method), $params);
  516.  
  517.                             // Write the result into IPC segment
  518.                             $this->_writeToIPCsegment();
  519.  
  520.                             // Unlock the semaphore but block _writeToIpcSegment()
  521.                             shmop_write($this->_internalSemKey, 0, 0);
  522.                             shmop_write($this->_internalSemKey, 1, 1);
  523.                             break;
  524.                     }
  525.                 }
  526.                 break;
  527.                
  528.             default:
  529.                 // Ignore all other singals
  530.                 break;
  531.         }
  532.     }
  533.  
  534.     /**
  535.      * Wait for IPC Semaphore
  536.      *
  537.      * @return void
  538.      */
  539.     private function _waitForIpcSemaphore()
  540.     {
  541.         while (true) {
  542.             $okay = shmop_read($this->_internalSemKey, 0, 1);
  543.  
  544.             if ($okay === 0) {
  545.                 break;
  546.             }
  547.  
  548.             usleep(10);
  549.         }
  550.     }
  551.  
  552.     /**
  553.      * Read data from IPC segment
  554.      *
  555.      * @throws ZendX_Console_Process_Exception When writing of SHM segment fails
  556.      * @return void
  557.      */
  558.     private function _readFromIpcSegment()
  559.     {
  560.         $serializedIpcData = shmop_read($this->_internalIpcKey,
  561.                                         0,
  562.                                         shmop_size($this->_internalIpcKey));
  563.  
  564.         if ($serializedIpcData === false) {
  565.             require_once 'ZendX/Console/Process/Exception.php';
  566.             throw new ZendX_Console_Process_Exception('Fatal error while reading SHM segment');
  567.         }
  568.  
  569.         $data = @unserialize($serializedIpcData);
  570.        
  571.         if ($data !== false) {
  572.             $this->_internalIpcData = $data;
  573.         }
  574.     }
  575.  
  576.     /**
  577.      * Write data to IPC segment
  578.      *
  579.      * @throws ZendX_Console_Process_Exception When writing of SHM segment fails
  580.      * @return void
  581.      */
  582.     private function _writeToIpcSegment()
  583.     {
  584.         // Read the transaction bit (2 bit of _internalSemKey segment). If it's
  585.         // value is 1, we're into the execution of a PHP_FORK_RETURN_METHOD, so
  586.         // we must not write to segment (data corruption)
  587.         if (shmop_read($this->_internalSemKey, 1, 1) === 1) {
  588.             return;
  589.         }
  590.  
  591.         $serializedIpcData = serialize($this->_internalIpcData);
  592.  
  593.         // Set the exchange array (IPC) into the shared segment
  594.         $shmBytesWritten = shmop_write($this->_internalIpcKey,
  595.                                        $serializedIpcData,
  596.                                        0);
  597.  
  598.         // Check if lenght of SHM segment is enougth to contain data
  599.         if ($shmBytesWritten !== strlen($serializedIpcData)) {
  600.             require_once 'ZendX/Console/Process/Exception.php';
  601.             throw new ZendX_Console_Process_Exception('Fatal error while writing to SHM segment');
  602.         }
  603.     }
  604.  
  605.     /**
  606.      * Create an IPC segment
  607.      *
  608.      * @throws ZendX_Console_Process_Exception When SHM segment can't be created
  609.      * @return boolean
  610.      */
  611.     private function _createIpcSegment()
  612.     {
  613.         $this->_ipcSegFile = realpath(sys_get_temp_dir()) . '/' . rand() . $this->_name . '.shm';
  614.         touch($this->_ipcSegFile);
  615.  
  616.         $shmKey = ftok($this->_ipcSegFile, 't');
  617.         if ($shmKey === -1) {
  618.             require_once 'ZendX/Console/Process/Exception.php';
  619.             throw new ZendX_Console_Process_Exception('Could not create SHM segment');
  620.         }
  621.  
  622.         $this->_internalIpcKey = @shmop_open($shmKey, 'c', 0644, 10240);
  623.  
  624.         if (!$this->_internalIpcKey) {
  625.             @unlink($this->_ipcSegFile);
  626.             return false;
  627.         }
  628.  
  629.         return true;
  630.     }
  631.  
  632.     /**
  633.      * Create IPC semaphore
  634.      *
  635.      * @throws ZendX_Console_Process_Exception When semaphore can't be created
  636.      * @return boolean
  637.      */
  638.     private function _createIpcSemaphore()
  639.     {
  640.         $this->_ipcSemFile = realpath(sys_get_temp_dir()) . '/' . rand() . $this->_name . '.sem';
  641.         touch($this->_ipcSemFile);
  642.  
  643.         $semKey = ftok($this->_ipcSemFile, 't');
  644.         if ($semKey === -1) {
  645.             require_once 'ZendX/Console/Process/Exception.php';
  646.             throw new ZendX_Console_Process_Exception('Could not create semaphore');
  647.         }
  648.  
  649.         $this->_internalSemKey = @shmop_open($semKey, 'c', 0644, 10);
  650.  
  651.         if (!$this->_internalSemKey) {
  652.             @unlink($this->_ipcSemFile);
  653.             return false;
  654.         }
  655.  
  656.         return true;
  657.     }
  658. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement