Guest User

Untitled

a guest
Jan 19th, 2018
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.21 KB | None | 0 0
  1. package Acme::Thread;
  2.  
  3. use Moose;
  4. use threads;
  5. use threads::shared;
  6.  
  7. sub spawn {
  8. my ($class, @args) = @_;
  9. my $ready : shared = 0;
  10.  
  11. lock($ready);
  12.  
  13. my $thread = threads->create(sub {
  14. my $self = $class->new(@args);
  15. $self->init;
  16.  
  17. lock($ready);
  18.  
  19. $ready = 1;
  20. cond_signal($ready);
  21.  
  22. $self->run;
  23. });
  24.  
  25. # while(1) {
  26. # warn "waiting on condition";
  27. # cond_wait($ready);
  28. #
  29. # warn "checking condition";
  30. #
  31. # next unless $ready;
  32. # }
  33.  
  34. #TODO argggggg
  35. sleep(2);
  36.  
  37. return $thread;
  38. }
  39.  
  40. sub init {
  41.  
  42. }
  43.  
  44. sub run {
  45.  
  46. }
  47.  
  48. sub exit {
  49. threads->exit;
  50. }
  51.  
  52. package Acme::Thread::RPC::Server;
  53.  
  54. use Moose;
  55. use threads::shared;
  56. use Thread::Queue;
  57. use Data::Dumper;
  58.  
  59. extends 'Acme::Thread';
  60.  
  61. has proxyBuf => (is => 'ro', isa => 'Thread::Queue', required => 1);
  62. has proxyObjectCache => (is => 'ro', isa => 'HashRef', default => sub { {} }, required => 1);
  63. has proxyObjectRefCount => (is => 'ro', isa => 'HashRef', default => sub { {} }, required => 1);
  64.  
  65. sub spawn {
  66. my ($class, @args) = @_;
  67. my $queue = Thread::Queue->new;
  68. my $thread = $class->SUPER::spawn(proxyBuf => $queue);
  69. my $proxy = Acme::Thread::Proxy->new(proxyBuf => $queue, threadId => $thread->tid);
  70.  
  71. return $proxy;
  72. }
  73.  
  74. sub init {
  75. my ($self) = @_;
  76.  
  77. $SIG{HUP} = sub { $self->checkProxyBuf };
  78.  
  79. $self->SUPER::init;
  80. }
  81.  
  82. sub checkProxyBuf {
  83. my ($self) = @_;
  84. my @items;
  85.  
  86. {
  87. lock(@{$self->proxyBuf});
  88. my $queueSize = $self->proxyBuf->pending;
  89. @items = $self->proxyBuf->dequeue_nb($queueSize);
  90. }
  91.  
  92. foreach (@items) {
  93. $self->doRemoteInvocationRequest($_);
  94. }
  95. }
  96.  
  97. sub doRemoteInvocationRequest {
  98. my ($self, $item) = @_;
  99.  
  100. lock(%$item);
  101.  
  102. my $methodName = $item->{method};
  103. my @results;
  104. my @ret : shared;
  105. my $object;
  106.  
  107. if (defined($item->{objectId})) {
  108. $object = $self->proxyObjectCache->{$item->{objectId}};
  109. } else {
  110. $object = $self;
  111. }
  112.  
  113. if (defined($item->{releaseObject})) {
  114. my $refCount = --$self->proxyObjectRefCount->{$item->{objectId}};
  115.  
  116. if ($refCount <= 0) {
  117. delete $self->proxyObjectCache->{$item->{objectId}};
  118. }
  119.  
  120. return;
  121. }
  122.  
  123. #TODO gross hack for now
  124. unless ($methodName eq 'exit') { eval {
  125. unless (defined($item->{context})) {
  126. $object->$methodName(@{$item->{args}});
  127. } elsif ($item->{context}) {
  128. @results = $object->$methodName(@{$item->{args}});
  129. } else {
  130. @results = scalar($object->$methodName(@{$item->{args}}));
  131. }
  132. };};
  133.  
  134. foreach(@results) {
  135. my $refType = ref($_);
  136.  
  137. if ($refType eq '' || $refType eq 'HASH' || $refType eq 'ARRAY' || $refType eq 'SCALAR') {
  138. push(@ret, $_);
  139. } else {
  140. my $objectId : shared = $_->object_id;
  141.  
  142. $self->proxyObjectCache->{$objectId} = $_;
  143. push(@ret, Acme::Thread::RPC::Proxy->new(class => ref($_), proxyBuf => $self->proxyBuf, objectId => $objectId, threadId => threads->tid));
  144. }
  145.  
  146. }
  147.  
  148. if ($@) {
  149. $item->{error} = $@;
  150. } else {
  151. $item->{returned} = \@ret;
  152. }
  153.  
  154. cond_signal(%$item);
  155.  
  156. if ($methodName eq 'exit') {
  157. $self->exit;
  158. }
  159.  
  160. }
  161.  
  162.  
  163. package Acme::Thread::Proxy;
  164.  
  165. use Moose;
  166. use threads::shared;
  167. use Thread::Queue;
  168.  
  169. has proxyBuf => (is => 'ro', isa => 'Thread::Queue', required => 1);
  170. has threadId => (is => 'ro', isa => 'Int', required => 1);
  171.  
  172. our $AUTOLOAD;
  173.  
  174. #make the moose object a sharable object
  175. around 'new' => sub {
  176. my $orig=shift;
  177. my $class=shift;
  178. my $self=$class->$orig(@_);
  179. my $shared_self : shared = shared_clone($self);
  180. return $shared_self;
  181. };
  182.  
  183. sub AUTOLOAD {
  184. my ($self, @args) = @_;
  185. my $name = $AUTOLOAD;
  186. my $context = wantarray;
  187. my %invocation : shared;
  188. my @sharedArgs : shared;
  189.  
  190. @sharedArgs = @args;
  191.  
  192. $name =~ s/.*://; # strip fully-qualified portion
  193.  
  194. %invocation = $self->_____subclassInvocationItems;
  195.  
  196. $invocation{args} = \@sharedArgs;
  197. $invocation{method} = $name;
  198. $invocation{context} = $context;
  199.  
  200. lock(%invocation);
  201.  
  202. $self->_____addToQueue(\%invocation);
  203. $self->_____waitForRemoteSide(\%invocation);
  204.  
  205. if (exists $invocation{error}) {
  206. die "method invocation died on remote end: ", $invocation{error};
  207. }
  208.  
  209. #handle void context
  210. return unless defined $context;
  211. #handle list context
  212. return @{$invocation{returned}} if $context;
  213. #handle scalar context
  214. return $invocation{returned}->[0];
  215. }
  216.  
  217. sub _____addToQueue {
  218. my ($self, $item) = @_;
  219. my $queue = $self->proxyBuf;
  220.  
  221. lock(@$queue);
  222. $self->proxyBuf->enqueue($item);
  223. threads->object($self->threadId)->kill('HUP');
  224.  
  225. return;
  226. }
  227.  
  228. sub _____waitForRemoteSide {
  229. my ($self, $invocation) = @_;
  230.  
  231. while(1) {
  232. cond_wait(%$invocation);
  233.  
  234. last if exists $invocation->{returned};
  235. last if exists $invocation->{error};
  236. }
  237.  
  238. return;
  239.  
  240. }
  241.  
  242. sub _____subclassInvocationItems {
  243. return ();
  244. }
  245.  
  246. package Acme::Thread::RPC::Proxy;
  247.  
  248. use Moose;
  249. use threads::shared;
  250. use Thread::Queue;
  251. use UNIVERSAL::Object::ID;
  252. use Data::Dumper;
  253.  
  254. extends 'Acme::Thread::Proxy';
  255.  
  256. has objectId => (is => 'ro', isa => 'Str', required => 1);
  257. has class => (is => 'ro', isa => 'Str', required => 1);
  258.  
  259. sub DESTROY {
  260. my ($self) = @_;
  261. my %invocation : shared;
  262.  
  263. return if $self->threadId == threads->tid;
  264.  
  265. lock(%invocation);
  266.  
  267. $invocation{releaseObject} = 1;
  268. $invocation{objectId} = $self->objectId;
  269.  
  270. $self->_____addToQueue(\%invocation);
  271.  
  272. return;
  273. }
  274.  
  275. sub _____subclassInvocationItems {
  276. my ($self) = @_;
  277.  
  278. return (objectId => $self->objectId);
  279. }
Add Comment
Please, Sign In to add comment