Advertisement
Guest User

maestrojed

a guest
Jan 26th, 2010
147
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 4.72 KB | None | 0 0
  1. <?php
  2. require_once('../lib/Phirehose.php');
  3. /**
  4.  * Example of using Phirehose to collect tweets to a "ghetto queue" (ie: simple, filesystem based queue).
  5.  * This is not designed to be a production-ready/scalable collection system but is simple and does not rely on any
  6.  * additional software or PHP modules.
  7.  *
  8.  * The idea with the ghetto queue is that a file is opened to collect tweets and rotate periodically to be processed by
  9.  * a separate processing process. If you need "live" processing (ie: realtime auto-responses to tweets) you'd want to
  10.  * keep this low - ie: 5-10 seconds. If you're doing analytics, you should rotate less often (perhaps every hour or so).
  11.  *
  12.  */
  13. class GhettoQueueCollector extends Phirehose
  14. {
  15.  
  16.   /**
  17.    * Subclass specific constants
  18.    */
  19.   const QUEUE_FILE_PREFIX = 'phirehose-ghettoqueue';
  20.   const QUEUE_FILE_ACTIVE = '.phirehose-ghettoqueue.current';
  21.  
  22.   /**
  23.    * Member attributes specific to this subclass
  24.    */
  25.   protected $queueDir;
  26.   protected $rotateInterval;
  27.   protected $streamFile;
  28.   protected $statusStream;
  29.   protected $lastRotated;
  30.  
  31.   /**
  32.    * Overidden constructor to take class-specific parameters
  33.    *
  34.    * @param string $username
  35.    * @param string $password
  36.    * @param string $queueDir
  37.    * @param integer $rotateInterval
  38.    */
  39.   public function __construct($username, $password, $queueDir = '/tmp', $rotateInterval = 10)
  40.   {
  41.    
  42.     // Sanity check
  43.     if ($rotateInterval < 5) {
  44.       throw new Exception('Rotate interval set too low - Must be >= 5 seconds');
  45.     }
  46.    
  47.     // Set subclass parameters
  48.     $this->queueDir = $queueDir;
  49.     $this->rotateInterval = $rotateInterval;
  50.    
  51.     // Call parent constructor
  52.     return parent::__construct($username, $password, Phirehose::METHOD_FILTER);
  53.   }
  54.  
  55.   /**
  56.    * Enqueue each status
  57.    *
  58.    * @param string $status
  59.    */
  60.   public function enqueueStatus($status)
  61.   {
  62.    
  63.     // Write the status to the stream (must be via getStream())
  64.     fputs($this->getStream(), $status);
  65.  
  66.     /* Are we due for a file rotate? Note this won't be called if there are no statuses coming through -
  67.      * This is (probably) a good thing as it means the collector won't needlessly rotate empty files. That said, if
  68.      * you have a very sparse/quiet stream that you need highly regular analytics on, this may not work for you.
  69.      */
  70.     $now = time();
  71.     if (($now - $this->lastRotated) > $this->rotateInterval) {
  72.       // Mark last rotation time as now
  73.       $this->lastRotated = $now;
  74.      
  75.       // Rotate it
  76.       $this->rotateStreamFile();
  77.     }
  78.    
  79.   }
  80.  
  81.   /**
  82.    * Returns a stream resource for the current file being written/enqueued to
  83.    *
  84.    * @return resource
  85.    */
  86.   private function getStream()
  87.   {
  88.     // If we have a valid stream, return it
  89.     if (is_resource($this->statusStream)) {
  90.       return $this->statusStream;
  91.     }
  92.    
  93.     // If it's not a valid resource, we need to create one
  94.     if (!is_dir($this->queueDir) || !is_writable($this->queueDir)) {
  95.       throw new Exception('Unable to write to queueDir: ' . $this->queueDir);
  96.     }
  97.  
  98.     // Construct stream file name, log and open
  99.     $this->streamFile = $this->queueDir . '/' . self::QUEUE_FILE_ACTIVE;
  100.     $this->log('Opening new active status stream: ' . $this->streamFile);
  101.     $this->statusStream = fopen($this->streamFile, 'a'); // Append if present (crash recovery)
  102.    
  103.     // Ok?
  104.     if (!is_resource($this->statusStream)) {
  105.       throw new Exception('Unable to open stream file for writing: ' . $this->streamFile);
  106.     }
  107.    
  108.     // If we don't have a last rotated time, it's effectively now
  109.     if ($this->lastRotated == NULL) {
  110.       $this->lastRotated = time();
  111.     }
  112.    
  113.     // Looking good, return the resource
  114.     return $this->statusStream;
  115.    
  116.   }
  117.  
  118.   /**
  119.    * Rotates the stream file if due
  120.    */
  121.   private function rotateStreamFile()
  122.   {
  123.     // Close the stream
  124.     fclose($this->statusStream);
  125.    
  126.     // Create queue file with timestamp so they're both unique and naturally ordered
  127.     $queueFile = $this->queueDir . '/' . self::QUEUE_FILE_PREFIX . '.' . date('Ymd-His') . '.queue';
  128.    
  129.     // Do the rotate
  130.     rename($this->streamFile, $queueFile);
  131.    
  132.     // Did it work?
  133.     if (!file_exists($queueFile)) {
  134.       throw new Exception('Failed to rotate queue file to: ' . $queueFile);
  135.     }
  136.    
  137.     // At this point, all looking good - the next call to getStream() will create a new active file
  138.     $this->log('Successfully rotated active stream to queue file: ' . $queueFile);
  139.   }
  140.  
  141. } // End of class
  142.  
  143. // Start streaming/collecting
  144. $sc = new GhettoQueueCollector('username', 'password');
  145. $sc->setTrack(array('morning', 'goodnight', 'hello', 'the', 'and'));
  146. $sc->consume();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement