Advertisement
Guest User

ES Query

a guest
Oct 22nd, 2015
46
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.55 KB | None | 0 0
  1. public class QueryRunner {
  2.     public static void run(String jobName, String index, String type, String queryPath, String aggsPath, String resultPath, Client client, SearchType searchType, int size, int timeOut, List<SortBuilder> sorts, boolean fields, String... fieldss)
  3.     {
  4.         double startSearchL = 0;
  5.         long hits = 0;
  6.         double totalBytes = 0;
  7.         double maxBytes = 0;
  8.  
  9.         XContentBuilder builder = null;
  10.         FileOutputStream fop = null;
  11.  
  12.         try {
  13.  
  14.             String query = readFileISO_8859_1(queryPath);
  15.             String aggs = new String();
  16.  
  17.             if(!aggsPath.isEmpty())
  18.                 aggs = readFileISO_8859_1(aggsPath);
  19.  
  20.             File batchFile = new File(resultPath);
  21.             fop = new FileOutputStream(batchFile);
  22.  
  23.             if (!batchFile.exists()) {
  24.                 batchFile.createNewFile();
  25.             }
  26.             else {
  27.                 batchFile.delete();
  28.             }
  29.  
  30.             startSearchL = System.currentTimeMillis();
  31.  
  32.             SearchRequestBuilder srb = client.prepareSearch(index).setTypes(type)
  33.                     .setQuery(query.getBytes())
  34.                     .setSearchType(searchType)
  35.                     .setScroll(new TimeValue(timeOut))
  36.                     .setSize(size);
  37.  
  38.             for(SortBuilder sort : sorts) {
  39.                 srb.addSort(sort);
  40.             }
  41.  
  42.             if(!aggs.isEmpty())
  43.                 srb.setAggregations(aggs.getBytes());
  44.             else
  45.                 if(fields)
  46.                     srb.addFields(fieldss);
  47.  
  48.  
  49.             SearchResponse scrollResp =  srb.execute().actionGet();
  50.    
  51.             hits = scrollResp.getHits().getTotalHits();
  52.            
  53.             //System.out.println(String.format("%40s - Hits: %d", jobName, hits));
  54.    
  55.             if(scrollResp.getAggregations() != null) {
  56.                 builder = XContentFactory.jsonBuilder();
  57.                 builder.startObject();
  58.                 scrollResp.toXContent(builder, ToXContent.EMPTY_PARAMS);
  59.                 builder.endObject();
  60.    
  61.                 fop.write(builder.string().getBytes());        
  62.             } else {
  63.                 String header = new String();
  64.                 boolean headerWritten = false;
  65.                
  66.                 int count = 0;
  67.                
  68.                 while (true) {
  69.                     //System.out.println("Host: " + scrollResp.remoteAddress().toString());
  70.                     //System.out.println("Took (ms): " + scrollResp.getTookInMillis());
  71.                     //System.out.println(count++);
  72.    
  73.                     for (SearchHit hit : scrollResp.getHits().getHits()) {                 
  74.                         String line = new String();
  75.  
  76.                         if(!fields)
  77.                             line = hit.sourceAsString() + "\r\n";
  78.                         else
  79.                         {
  80.                             for(Entry<String, SearchHitField> e : hit.fields().entrySet()){
  81.                                 header += "\"" + e.getKey() + "\",";
  82.                             }
  83.  
  84.                             if(!headerWritten) {
  85.                                 header += "\r\n";
  86.                                 //System.out.println(header);
  87.                                 fop.write(header.getBytes());
  88.                                 headerWritten = true;
  89.                             }
  90.  
  91.                             for(Entry<String, SearchHitField> e : hit.fields().entrySet()){
  92.                                 line += "\"" + e.getValue().value().toString() + "\",";
  93.                             }
  94.        
  95.                             line += "\r\n";
  96.                         }
  97.                            
  98.                         if(line.length() > maxBytes)
  99.                             maxBytes = line.length();
  100.                        
  101.                         totalBytes += line.length();
  102.    
  103.                        byte[] bytes = line.getBytes();
  104.                        fop.write(bytes);
  105.                     }
  106.    
  107.                     scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
  108.    
  109.                     if (scrollResp.getHits().getHits().length == 0) {
  110.                         break;
  111.                     }
  112.                 }
  113.             }
  114.         } catch(Exception ex) {
  115.             System.out.println("Job Failed: " + jobName);
  116.             ex.printStackTrace();
  117.         }
  118.         finally {
  119.        
  120.             double endSearchL = System.currentTimeMillis();
  121.        
  122.             double totalTimeMins = ((endSearchL - startSearchL)/1000/60);
  123.             double totalTimeSecs = ((endSearchL - startSearchL)/1000);
  124.    
  125.             double avgDocSize = hits > 0 ? (totalBytes/hits/1024) : 0;
  126.             double maxDocSize = (maxBytes/1024);
  127.            
  128.             System.out.println(String.format("%40s - Total time: %10.5f(mins), %10.5f(secs) - Avg doc size: %10.5f(kb) - Max doc size: %10.5f(kb) - Hits: %10d", jobName, totalTimeMins, totalTimeSecs, avgDocSize, maxDocSize, hits));
  129.            
  130.            
  131.            
  132.             //System.out.println(jobName + " - Total time: " + totalTimeMins + "(mins), " + totalTimeSecs + "(secs)" + " - Avg doc size: "  + avgDocSize + "(kb)" + " - Max doc size: "  + maxDocSize + "(kb)");
  133.            
  134.             if(builder != null)
  135.                 builder.close();
  136.  
  137.             if(fop != null) {
  138.                 try {
  139.                     fop.flush();
  140.                     fop.close();
  141.                 } catch(Exception ex) {
  142.                     ex.printStackTrace();
  143.                 }
  144.             }          
  145.         }
  146.     }  
  147.    
  148.     public static String readFileISO_8859_1(String path) throws IOException {
  149.         return readFile(path, StandardCharsets.ISO_8859_1);    
  150.     }  
  151.  
  152.     public static String readFile(String path, Charset encoding) throws IOException {
  153.         byte[] encoded = Files.readAllBytes(Paths.get(path));
  154.         return new String(encoded, encoding);
  155.     }  
  156. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement