Advertisement
Guest User

Untitled

a guest
Jul 31st, 2012
307
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 12.22 KB | None | 0 0
  1. package main;
  2.  
  3. import java.util.AbstractMap;
  4. import java.util.ArrayList;
  5. import java.util.Calendar;
  6. import java.util.Collections;
  7. import java.util.Date;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.Map.Entry;
  11. import java.util.Random;
  12. import java.util.Timer;
  13. import java.util.TimerTask;
  14.  
  15. import org.elasticsearch.action.bulk.BulkRequestBuilder;
  16. import org.elasticsearch.action.bulk.BulkResponse;
  17. import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
  18. import org.elasticsearch.action.index.IndexRequestBuilder;
  19. import org.elasticsearch.action.search.SearchRequestBuilder;
  20. import org.elasticsearch.action.search.SearchResponse;
  21. import org.elasticsearch.client.Client;
  22. import org.elasticsearch.common.xcontent.XContentBuilder;
  23. import org.elasticsearch.common.xcontent.XContentFactory;
  24. import org.elasticsearch.index.query.FilterBuilders;
  25. import org.elasticsearch.index.query.QueryBuilder;
  26. import org.elasticsearch.index.query.QueryBuilders;
  27. import org.elasticsearch.index.query.QueryStringQueryBuilder.Operator;
  28. import org.elasticsearch.indices.IndexMissingException;
  29. import org.elasticsearch.node.NodeBuilder;
  30. import org.elasticsearch.search.sort.SortOrder;
  31. import org.slf4j.Logger;
  32. import org.slf4j.LoggerFactory;
  33.  
  34. import com.google.gson.Gson;
  35.  
  36. public class ElasticSearchBug {
  37.  
  38.     private static String charset = "abcdefghijklmnopqrstuvwxyz";
  39.  
  40.     public static String word()
  41.     {
  42.         Random rand = new Random();
  43.         StringBuilder sb = new StringBuilder();
  44.         for (int i = 0; i < 4; i++)
  45.         {
  46.             int c = rand.nextInt(charset.length());
  47.             sb.append(charset.charAt(c));
  48.         }
  49.         sb.append(" ");
  50.         return sb.toString();
  51.     }
  52.  
  53.     public static String content()
  54.     {
  55.         Random rand = new Random();
  56.         int s = 1 + rand.nextInt(30);
  57.         StringBuilder sb = new StringBuilder();
  58.         for (int i = 0; i < s; i++)
  59.         {
  60.             sb.append(word());
  61.         }
  62.         return sb.toString();
  63.     }
  64.  
  65.     public static void main(String[] args) throws InterruptedException
  66.     {
  67.         ElasticSearchIndexerBug es = new ElasticSearchIndexerBug();
  68.         SearchTask st0 = new SearchTask(es);
  69.         InsertTask it0 = new InsertTask(es);
  70.  
  71.         st0.start();
  72.         it0.start();
  73.  
  74.         st0.join();
  75.         it0.join();
  76.     }
  77.  
  78.     private static class SearchTask
  79.         extends Thread {
  80.         private ElasticSearchIndexerBug es;
  81.         private Date date = new Date();
  82.  
  83.         public SearchTask(ElasticSearchIndexerBug es)
  84.         {
  85.             this.es = es;
  86.         }
  87.  
  88.         @Override
  89.         public void run()
  90.         {
  91.             for (int i = 0; i < 1000; i++)
  92.             {
  93.                 es.search(word(), "TWITTER", date.getTime(), "EN");
  94.                 try
  95.                 {
  96.                     sleep(50);
  97.                 }
  98.                 catch (InterruptedException e)
  99.                 {
  100.                     e.printStackTrace();
  101.                 }
  102.             }
  103.  
  104.         }
  105.     }
  106.  
  107.     private static class InsertTask
  108.         extends Thread {
  109.         private ElasticSearchIndexerBug es;
  110.         private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchIndexerBug.class);
  111.  
  112.         public InsertTask(ElasticSearchIndexerBug es)
  113.         {
  114.             this.es = es;
  115.         }
  116.  
  117.         @Override
  118.         public void run()
  119.         {
  120.             LOG.debug("starting InsertTask");
  121.             for (int i = 0; i < 1000; i++)
  122.             {
  123.                 PostBug p = new PostBug();
  124.                 p.setContent(content());
  125.                 p.setUploadDate(Calendar.getInstance());
  126.                 p.setLanguage("EN");
  127.                 System.out.println("indexing: " + p.getExtendedContent());
  128.                
  129.                 es.addIndex(p);
  130.                 try
  131.                 {
  132.                     sleep(50);
  133.                 }
  134.                 catch (InterruptedException e)
  135.                 {
  136.                     e.printStackTrace();
  137.                 }
  138.             }
  139.         }
  140.     }
  141.  
  142.     private static class PostBug {
  143.         private String content;
  144.         private Calendar uploadDate;
  145.         private String language;
  146.  
  147.         public void setContent(String content)
  148.         {
  149.             this.content = content;
  150.         }
  151.  
  152.         public String getExtendedContent()
  153.         {
  154.             return this.content;
  155.         }
  156.  
  157.         public void setUploadDate(Calendar uploadDate)
  158.         {
  159.             this.uploadDate = uploadDate;
  160.         }
  161.  
  162.         public Calendar getUploadDate()
  163.         {
  164.             return this.uploadDate;
  165.         }
  166.  
  167.         public void setLanguage(String language)
  168.         {
  169.             this.language = language;
  170.         }
  171.  
  172.         public String getLanguage()
  173.         {
  174.             return this.language;
  175.         }
  176.  
  177.     }
  178.  
  179.     private static class ProfileBug {
  180.     }
  181.  
  182.     private static class ElasticSearchIndexerBug {
  183.         private static final String CONTENT_FIELD = "content";
  184.         private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchIndexerBug.class);
  185.         private int MAX_LIST_SIZE = 1000;
  186.         private int REPEAT_INTERVAL = 100;
  187.         private static final long EXPIRE_DATE = 20 * 60 * 1000;
  188.         private static final String POSTS_INDEX = "posts";
  189.         private org.elasticsearch.node.Node node;
  190.         private Client client;
  191.         private Timer timer;
  192.         private List<Entry<PostBug, ProfileBug>> postIndex;
  193.  
  194.         public ElasticSearchIndexerBug()
  195.         {
  196.             postIndex = Collections.synchronizedList(new ArrayList<Entry<PostBug, ProfileBug>>());
  197.             this.node = NodeBuilder.nodeBuilder().node();
  198.             this.client = this.node.client();
  199.             this.client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
  200.          
  201.             this.timer = new Timer();
  202.             this.timer.schedule(new IndexerTask(this), REPEAT_INTERVAL, REPEAT_INTERVAL);
  203.         }
  204.  
  205.         public PostBug addIndex(PostBug post)
  206.         {
  207.             LOG.debug("Indexing new post {} without profile", "api");
  208.  
  209.             try
  210.             {
  211.                 synchronized (postIndex)
  212.                 {
  213.                     Map.Entry<PostBug, ProfileBug> entry = new AbstractMap.SimpleEntry<PostBug, ProfileBug>(post, null);
  214.                     postIndex.add(entry);
  215.                 }
  216.  
  217.                 if (postIndex.size() > MAX_LIST_SIZE)
  218.                 {
  219.                     updatePostIndexes();
  220.                 }
  221.             }
  222.             catch (Exception e)
  223.             {
  224.                 LOG.error("Could not index tweet ", e);
  225.                 return null;
  226.             }
  227.  
  228.             return post;
  229.         }
  230.  
  231.         public List<Entry<PostBug, ProfileBug>> search(String query, String source, Long lastCollected, String language)
  232.         {
  233.             LOG.debug("Search in elasticsearch using {} for source {}", query, source);
  234.             LOG.debug("Search in elasticsearch using {} language {}", query, language);
  235.  
  236.             SearchResponse response = null;
  237.             SearchRequestBuilder request = client.prepareSearch(POSTS_INDEX);
  238.  
  239.             try
  240.             {
  241.                 request.setFilter(FilterBuilders.numericRangeFilter("uploadDate").gte(lastCollected));
  242.                 QueryBuilder qb;
  243.  
  244.                 if (language.equals("ALL"))
  245.                 {
  246.                     qb = QueryBuilders.queryString(query).analyzer("standard").defaultField(CONTENT_FIELD).defaultOperator(Operator.AND).phraseSlop(0).allowLeadingWildcard(false);
  247.                 }
  248.                 else
  249.                 {
  250.                     qb = QueryBuilders.filteredQuery(QueryBuilders.queryString(query).analyzer("standard").defaultField(CONTENT_FIELD).defaultOperator(Operator.AND).phraseSlop(0).allowLeadingWildcard(false), FilterBuilders.inFilter("language", language.toString().toUpperCase()));
  251.                 }
  252.  
  253.                 request.setQuery(qb);
  254.                 request.setTypes(source.toString().toUpperCase());
  255.                 request.addSort("uploadDate", SortOrder.ASC);
  256.                 request.setSize(MAX_LIST_SIZE);
  257.                 request.setFrom(0);
  258.                 request.setExplain(false);
  259.                
  260.                 //LOG.debug("{}",request);
  261.                
  262.                 // Execute get
  263.                 response = request.execute().actionGet();
  264.             }
  265.             catch (IndexMissingException e)
  266.             {
  267.                 LOG.warn("Index not created - {}", e.getMessage());
  268.             }
  269.             catch (Exception e)
  270.             {
  271.                 LOG.error("Could not search - {}", e.getMessage());
  272.                 e.printStackTrace();
  273.             }
  274.  
  275.             if (response == null) return new ArrayList<Map.Entry<PostBug, ProfileBug>>(0);
  276.  
  277.             List<Entry<PostBug, ProfileBug>> result = new ArrayList<Map.Entry<PostBug, ProfileBug>>();
  278.  
  279.             return result;
  280.         }
  281.  
  282.         public void updatePostIndexes()
  283.         {
  284.             try
  285.             {
  286.                 expireOldData();
  287.  
  288.                 // Bulk index tweets
  289.                 BulkRequestBuilder bulkRequest = client.prepareBulk();
  290.                 synchronized (postIndex)
  291.                 {
  292.                     if (this.postIndex.isEmpty()) { return; }
  293.  
  294.                     for (Entry<PostBug, ProfileBug> entry : postIndex)
  295.                     {
  296.                         Gson postGson = new Gson();
  297.                         Gson profileGson = new Gson();
  298.  
  299.                         XContentBuilder content = XContentFactory.jsonBuilder().startObject();
  300.                         content.field(CONTENT_FIELD, entry.getKey().getExtendedContent());
  301.                         content.field("post", postGson.toJson(entry.getKey()));
  302.                         content.field("profile", profileGson.toJson(entry.getValue()));
  303.                         if (entry.getKey().getLanguage() == null)
  304.                         {
  305.                             // language not known
  306.                             content.field("language", "ALL");
  307.                         }
  308.                         else
  309.                         {
  310.                             content.field("language", entry.getKey().getLanguage().toString().toUpperCase());
  311.                         }
  312.                         content.field("uploadDate", entry.getKey().getUploadDate().getTimeInMillis());
  313.                         content.endObject();
  314.  
  315.                         IndexRequestBuilder request = client.prepareIndex(POSTS_INDEX, "TWITTER");
  316.                         request.setSource(content);
  317.  
  318.                         bulkRequest.add(request);
  319.                     }
  320.                     postIndex.clear();
  321.                 }
  322.  
  323.                 BulkResponse bulkResponse = bulkRequest.execute().actionGet();
  324.                 //client.admin().indices().prepareRefresh(POSTS_INDEX);
  325.  
  326.                 if (bulkResponse.hasFailures())
  327.                 {
  328.                     LOG.warn("Couldnt index all posts {}", bulkResponse.buildFailureMessage());
  329.                 }
  330.             }
  331.             catch (Exception e)
  332.             {
  333.                 LOG.error("Some error occured while trying to update indexes", e);
  334.             }
  335.         }
  336.  
  337.         public void expireOldData()
  338.         {
  339.             Calendar oldDate = Calendar.getInstance();
  340.             oldDate.setTimeInMillis(oldDate.getTimeInMillis() - EXPIRE_DATE);
  341.  
  342.             try
  343.             {
  344.                 DeleteByQueryRequestBuilder request = client.prepareDeleteByQuery(POSTS_INDEX);
  345.                 request.setQuery(QueryBuilders.rangeQuery("uploadDate").lte(oldDate.getTimeInMillis()));
  346.                 request.execute().actionGet();
  347.             }
  348.             catch (Exception e)
  349.             {
  350.                 LOG.warn("Could NOT expire old tweets {} ", e.getMessage());
  351.             }
  352.         }
  353.  
  354.         private class IndexerTask
  355.             extends TimerTask {
  356.             private ElasticSearchIndexerBug indexer;
  357.  
  358.             public IndexerTask(ElasticSearchIndexerBug elasticSearchIndexerBug)
  359.             {
  360.                 this.indexer = elasticSearchIndexerBug;
  361.             }
  362.  
  363.             @Override
  364.             public void run()
  365.             {
  366.                 indexer.updatePostIndexes();
  367.             }
  368.         }
  369.     }
  370.  
  371. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement