Don't like ads? PRO users don't see any ads ;-)
Guest

Untitled

By: a guest on Jun 17th, 2012  |  syntax: None  |  size: 1.74 KB  |  hits: 15  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. package MyApp::Model;
  2. use Moose;
  3. use JSON::XS;
  4. use AnyEvent::RabbitMQ;
  5. use MooseX::ClassAttribute;
  6. class_has queue => ( is => 'ro', builder => '_build_queue' );
  7. has _channel => ( is => 'rw' );
  8.  
  9. sub _build_queue {
  10.     my $self = shift;
  11.     my $cv   = AE::cv;
  12.     my $ar   = AnyEvent::RabbitMQ->new->load_xml_spec->connect(
  13.         host       => 'localhost',
  14.         port       => 5672,
  15.         user       => 'guest',
  16.         pass       => 'guest',
  17.         vhost      => '/',
  18.         on_success => sub {
  19.             shift->open_channel(
  20.                 on_success => sub {
  21.                     shift->declare_queue(
  22.                         queue      => 'myapp.jobs',
  23.                         durable    => 1,
  24.                         on_success => $cv
  25.                     );
  26.                 },
  27.             );
  28.         },
  29.     );
  30.     $cv->recv;
  31.     return $ar;
  32. }
  33.  
  34. sub channel {
  35.     my ( $self, $cb ) = @_;
  36.     return $cb->( $self->_channel ) if ( $self->_channel );
  37.     $self->queue->open_channel(
  38.         on_success => sub { $self->_channel(@_); $cb->(@_) } );
  39. }
  40.  
  41. sub publish {
  42.     my ( $self, $body ) = @_;
  43.     $self->channel(
  44.         sub {
  45.             shift->publish(
  46.                 routing_key => 'myapp.jobs',
  47.                 body        => encode_json($body),
  48.             );
  49.         }
  50.     );
  51. }
  52.  
  53. sub new_queue {
  54.     my $self  = shift;
  55.     my $cv    = AE::cv;
  56.     my $queue = $self->channel(
  57.         sub { shift->declare_queue( durable => 1, on_success => $cv ) } );
  58.     return $cv;
  59. }
  60.  
  61. sub consume {
  62.     my ( $self, $queue ) = @_;
  63.     my $cv  = AE::cv;
  64.     my $res = $self->channel(
  65.         sub {
  66.             shift->consume(
  67.                 queue      => $queue,
  68.                 on_consume => $cv,
  69.                 no_ack     => 1,
  70.             );
  71.         }
  72.     );
  73.     return $cv;
  74. }