Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package Acme::Thread;
- use Moose;
- use threads;
- use threads::shared;
- sub spawn {
- my ($class, @args) = @_;
- my $ready : shared = 0;
- lock($ready);
- my $thread = threads->create(sub {
- my $self = $class->new(@args);
- $self->init;
- lock($ready);
- $ready = 1;
- cond_signal($ready);
- $self->run;
- });
- # while(1) {
- # warn "waiting on condition";
- # cond_wait($ready);
- #
- # warn "checking condition";
- #
- # next unless $ready;
- # }
- #TODO argggggg
- sleep(2);
- return $thread;
- }
- sub init {
- }
- sub run {
- }
- sub exit {
- threads->exit;
- }
- package Acme::Thread::RPC::Server;
- use Moose;
- use threads::shared;
- use Thread::Queue;
- use Data::Dumper;
- extends 'Acme::Thread';
- has proxyBuf => (is => 'ro', isa => 'Thread::Queue', required => 1);
- has proxyObjectCache => (is => 'ro', isa => 'HashRef', default => sub { {} }, required => 1);
- has proxyObjectRefCount => (is => 'ro', isa => 'HashRef', default => sub { {} }, required => 1);
- sub spawn {
- my ($class, @args) = @_;
- my $queue = Thread::Queue->new;
- my $thread = $class->SUPER::spawn(proxyBuf => $queue);
- my $proxy = Acme::Thread::Proxy->new(proxyBuf => $queue, threadId => $thread->tid);
- return $proxy;
- }
- sub init {
- my ($self) = @_;
- $SIG{HUP} = sub { $self->checkProxyBuf };
- $self->SUPER::init;
- }
- sub checkProxyBuf {
- my ($self) = @_;
- my @items;
- {
- lock(@{$self->proxyBuf});
- my $queueSize = $self->proxyBuf->pending;
- @items = $self->proxyBuf->dequeue_nb($queueSize);
- }
- foreach (@items) {
- $self->doRemoteInvocationRequest($_);
- }
- }
- sub doRemoteInvocationRequest {
- my ($self, $item) = @_;
- lock(%$item);
- my $methodName = $item->{method};
- my @results;
- my @ret : shared;
- my $object;
- if (defined($item->{objectId})) {
- $object = $self->proxyObjectCache->{$item->{objectId}};
- } else {
- $object = $self;
- }
- if (defined($item->{releaseObject})) {
- my $refCount = --$self->proxyObjectRefCount->{$item->{objectId}};
- if ($refCount <= 0) {
- delete $self->proxyObjectCache->{$item->{objectId}};
- }
- return;
- }
- #TODO gross hack for now
- unless ($methodName eq 'exit') { eval {
- unless (defined($item->{context})) {
- $object->$methodName(@{$item->{args}});
- } elsif ($item->{context}) {
- @results = $object->$methodName(@{$item->{args}});
- } else {
- @results = scalar($object->$methodName(@{$item->{args}}));
- }
- };};
- foreach(@results) {
- my $refType = ref($_);
- if ($refType eq '' || $refType eq 'HASH' || $refType eq 'ARRAY' || $refType eq 'SCALAR') {
- push(@ret, $_);
- } else {
- my $objectId : shared = $_->object_id;
- $self->proxyObjectCache->{$objectId} = $_;
- push(@ret, Acme::Thread::RPC::Proxy->new(class => ref($_), proxyBuf => $self->proxyBuf, objectId => $objectId, threadId => threads->tid));
- }
- }
- if ($@) {
- $item->{error} = $@;
- } else {
- $item->{returned} = \@ret;
- }
- cond_signal(%$item);
- if ($methodName eq 'exit') {
- $self->exit;
- }
- }
- package Acme::Thread::Proxy;
- use Moose;
- use threads::shared;
- use Thread::Queue;
- has proxyBuf => (is => 'ro', isa => 'Thread::Queue', required => 1);
- has threadId => (is => 'ro', isa => 'Int', required => 1);
- our $AUTOLOAD;
- #make the moose object a sharable object
- around 'new' => sub {
- my $orig=shift;
- my $class=shift;
- my $self=$class->$orig(@_);
- my $shared_self : shared = shared_clone($self);
- return $shared_self;
- };
- sub AUTOLOAD {
- my ($self, @args) = @_;
- my $name = $AUTOLOAD;
- my $context = wantarray;
- my %invocation : shared;
- my @sharedArgs : shared;
- @sharedArgs = @args;
- $name =~ s/.*://; # strip fully-qualified portion
- %invocation = $self->_____subclassInvocationItems;
- $invocation{args} = \@sharedArgs;
- $invocation{method} = $name;
- $invocation{context} = $context;
- lock(%invocation);
- $self->_____addToQueue(\%invocation);
- $self->_____waitForRemoteSide(\%invocation);
- if (exists $invocation{error}) {
- die "method invocation died on remote end: ", $invocation{error};
- }
- #handle void context
- return unless defined $context;
- #handle list context
- return @{$invocation{returned}} if $context;
- #handle scalar context
- return $invocation{returned}->[0];
- }
- sub _____addToQueue {
- my ($self, $item) = @_;
- my $queue = $self->proxyBuf;
- lock(@$queue);
- $self->proxyBuf->enqueue($item);
- threads->object($self->threadId)->kill('HUP');
- return;
- }
- sub _____waitForRemoteSide {
- my ($self, $invocation) = @_;
- while(1) {
- cond_wait(%$invocation);
- last if exists $invocation->{returned};
- last if exists $invocation->{error};
- }
- return;
- }
- sub _____subclassInvocationItems {
- return ();
- }
- package Acme::Thread::RPC::Proxy;
- use Moose;
- use threads::shared;
- use Thread::Queue;
- use UNIVERSAL::Object::ID;
- use Data::Dumper;
- extends 'Acme::Thread::Proxy';
- has objectId => (is => 'ro', isa => 'Str', required => 1);
- has class => (is => 'ro', isa => 'Str', required => 1);
- sub DESTROY {
- my ($self) = @_;
- my %invocation : shared;
- return if $self->threadId == threads->tid;
- lock(%invocation);
- $invocation{releaseObject} = 1;
- $invocation{objectId} = $self->objectId;
- $self->_____addToQueue(\%invocation);
- return;
- }
- sub _____subclassInvocationItems {
- my ($self) = @_;
- return (objectId => $self->objectId);
- }
Add Comment
Please, Sign In to add comment