Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- my $fh;
- if (!open($fh, '<', $prefix.$source)) {
- print logDate()."Unable to open $source for reading: $!\n";
- return 0;
- }
- binmode($fh);
- my $contentLength = -s $source;
- print logDate()."contentLength: $contentLength\n" if ($this->{'verbose'}>1);
- my $md5 = Digest::MD5->new();
- my $startTime = time();
- my $req = $this->makeRequest('PUT', $target.$source, {'x-amz-meta-lstat'=>join(':', @$lstat)}, {'Content-Length'=>$contentLength});
- print logDate()."request : \n".Dumper($req) if ($this->{'verbose'}>1);
- print logDate()."writing to buffer\n" if ($this->{'verbose'}>1);
- my $uTime = time();
- $ua->do_request(
- request => $req,
- # Probably duplicating a load of logic here :(
- host => $host,
- port => $port || $url->scheme || 80,
- SSL => $url->scheme eq 'https' ? 1 : 0,
- # We override the default behaviour (pulling content from HTTP::Request) by passing a callback explicitly
- request_body => sub {
- my ($stream) = @_;
- # This part is the important one - read some data, and eventually return it
- my $read = sysread $fh, my $buffer, 32768;
- $md5->add($buffer);
- print "o" if ($this->{'verbose'}>5);
- # if ($this->{'bwlimit'}) {
- # my $myBW = $bytesRead/(time()-$uTime)/1000000;
- # if ($myBW>$this->{'bwlimit'}) {
- # my $usleep = ((1000/$this->{'bwlimit'})-(1000/($myBW-$this->{'bwlimit'})))*1000*1.1;
- # usleep($usleep) if ($usleep>0 && $contentLength>1000000);
- # }
- # }
- $uTime = time();
- return $buffer if $read;
- # Don't really need to close here, but might as well clean up as soon as we're ready
- close $fh or warn $!;
- undef $fh;
- return;
- },
- on_response => sub {
- my ($response) = @_;
- if($fh) {
- close $fh or die("Unable to close $target: $!");
- }
- $this->{'loop'}->remove($ua);
- $this->{'loopCnt'}--;
- if ($response->code == 500) {
- push(@{$this->{'todo'}}, $source);
- return;
- }
- print "\n" if ($this->{'verbose'});
- my $digest = $md5->hexdigest();
- my $etag = $response->header('ETag');
- $etag =~ s/"//g;
- $response->header('ETag', $etag);
- die ("noEtag".Dumper($response)) if (!$response->header('ETag'));
- die ("MD5 Sum ".$digest." did not match ".$response->header('ETag')."!") if ($digest ne $response->header('ETag'));
- my $diffTime = time()-$startTime;
- $diffTime=1 if (!$diffTime);
- print logDate()."completed: ".sprintf("%.2f", $contentLength*8/1000000/$diffTime)."Mb/s\n\n" if ($this->{'verbose'});
- },
- on_error => sub {
- my ( $message ) = @_;
- if($fh) {
- close $fh or die $!;
- }
- $this->{'loop'}->remove($ua);
- $this->{'loopCnt'}--;
- print STDERR "Failed - $message\n";
- # Could do a $loop->loop_stop here - some failures should be fatal!
- }
- );
Add Comment
Please, Sign In to add comment