Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- my $cursor = do {
- $result = $elastic->client->search(
- index => 'log',
- type => 'log',
- query => $query,
- size => $bulk_size,
- scroll => $scroll,
- search_type => 'scan',
- );
- sub {
- my $rs = $elastic->client->scroll(
- scroll_id => $result->{_scroll_id},
- scroll => $scroll,
- );
- $rs = 0 unless defined $rs->{hits}->{hits}->[0];
- $rs;
- };
- };
- while( $result = $cursor->() ) {
- for( @{ $result->{hits}->{hits} } ) {
- # dumper le document en json et l'écrire sur une ligne dans un fichier
- }
- }
- -----
- my $index = sub {
- my $bulk = shift;
- $elastic->client->bulk( $bulk, refresh => 0 );
- };
- my $i = 0;
- my $bulk = [];
- foreach my $file ( @files ) {
- open( my $in, '<', $file ) or die "can't open file '$file' : $!\n";
- while( my $line = <$in> ) {
- try {
- my $tweet = $json->decode( $line );
- push( @$bulk, {
- index => {
- index => 'log',
- type => 'log',
- id => "".delete($tweet->{id}),
- data => $tweet
- }
- } );
- } catch {
- print STDERR (defined $_? $_ : '')." - line : ".$line;
- };
- print STDERR $eta->tick;
- if( ++$i > $bulk_size ) {
- try {
- $index->( $bulk );
- } catch {
- print STDERR $_;
- };
- $bulk = [];
- $i = 0;
- }
- }
- close $file;
- }
- if( $i ) {
- try { $index->( $bulk ); } catch { print STDERR $_; };
- }
- $elastic->client->refresh_index( index => $elastic->index );
Add Comment
Please, Sign In to add comment