Advertisement
Guest User

Untitled

a guest
Jul 30th, 2012
543
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 11.58 KB | None | 0 0
  1. package main;
  2.  
  3. import java.util.AbstractMap;
  4. import java.util.ArrayList;
  5. import java.util.Calendar;
  6. import java.util.Collections;
  7. import java.util.Date;
  8. import java.util.List;
  9. import java.util.Map;
  10. import java.util.Random;
  11. import java.util.Timer;
  12. import java.util.TimerTask;
  13. import java.util.Map.Entry;
  14.  
  15. import org.elasticsearch.action.bulk.BulkRequestBuilder;
  16. import org.elasticsearch.action.bulk.BulkResponse;
  17. import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
  18. import org.elasticsearch.action.index.IndexRequestBuilder;
  19. import org.elasticsearch.action.search.SearchRequestBuilder;
  20. import org.elasticsearch.action.search.SearchResponse;
  21. import org.elasticsearch.client.Client;
  22. import org.elasticsearch.common.xcontent.XContentBuilder;
  23. import org.elasticsearch.common.xcontent.XContentFactory;
  24. import org.elasticsearch.index.query.FilterBuilders;
  25. import org.elasticsearch.index.query.QueryBuilder;
  26. import org.elasticsearch.index.query.QueryBuilders;
  27. import org.elasticsearch.index.query.QueryStringQueryBuilder.Operator;
  28. import org.elasticsearch.node.NodeBuilder;
  29. import org.elasticsearch.search.sort.SortOrder;
  30. import org.slf4j.Logger;
  31. import org.slf4j.LoggerFactory;
  32.  
  33. import com.google.gson.Gson;
  34.  
  35. public class ElasticSearchBug {
  36.  
  37.     private static String charset = "abcdefghijklmnopqrstuvwxyz";
  38.  
  39.     public static String word()
  40.     {
  41.         Random rand = new Random();
  42.         StringBuilder sb = new StringBuilder();
  43.         for (int i = 0; i < 4; i++)
  44.         {
  45.             int c = rand.nextInt(charset.length());
  46.             sb.append(charset.charAt(c));
  47.         }
  48.         return sb.toString();
  49.     }
  50.  
  51.     public static String content()
  52.     {
  53.         Random rand = new Random();
  54.         int s = rand.nextInt(30);
  55.         StringBuilder sb = new StringBuilder();
  56.         for (int i = 0; i < s; i++)
  57.         {
  58.             sb.append(word());
  59.         }
  60.         return sb.toString();
  61.     }
  62.  
  63.     public static void main(String[] args) throws InterruptedException
  64.     {
  65.         ElasticSearchIndexerBug es = new ElasticSearchIndexerBug();
  66.         SearchTask st0 = new SearchTask(es);
  67.         InsertTask it0 = new InsertTask(es);
  68.  
  69.         st0.run();
  70.         it0.run();
  71.  
  72.         st0.join();
  73.         it0.join();
  74.     }
  75.  
  76.     private static class SearchTask
  77.         extends Thread {
  78.         private ElasticSearchIndexerBug es;
  79.         private Date date = new Date();
  80.  
  81.         public SearchTask(ElasticSearchIndexerBug es)
  82.         {
  83.             this.es = es;
  84.         }
  85.  
  86.         @Override
  87.         public void run()
  88.         {
  89.             for (int i = 0; i < 1000; i++)
  90.             {
  91.                 es.search(word(), "TWITTER", date.getTime(), "EN");
  92.                 try
  93.                 {
  94.                     sleep(10);
  95.                 }
  96.                 catch (InterruptedException e)
  97.                 {
  98.                     e.printStackTrace();
  99.                 }
  100.             }
  101.  
  102.         }
  103.     }
  104.  
  105.     private static class InsertTask
  106.         extends Thread {
  107.         private ElasticSearchIndexerBug es;
  108.  
  109.         public InsertTask(ElasticSearchIndexerBug es)
  110.         {
  111.             this.es = es;
  112.         }
  113.  
  114.         @Override
  115.         public void run()
  116.         {
  117.             for (int i = 0; i < 500; i++)
  118.             {
  119.                 PostBug p = new PostBug();
  120.                 p.setContent(content());
  121.                 p.setUploadDate(Calendar.getInstance());
  122.                 p.setLanguage("EN");
  123.                 es.addIndex(p);
  124.                 try
  125.                 {
  126.                     sleep(50);
  127.                 }
  128.                 catch (InterruptedException e)
  129.                 {
  130.                     e.printStackTrace();
  131.                 }
  132.             }
  133.         }
  134.     }
  135.  
  136.     private static class PostBug {
  137.         private String content;
  138.         private Calendar uploadDate;
  139.         private String language;
  140.  
  141.         public void setContent(String content)
  142.         {
  143.             this.content = content;
  144.         }
  145.  
  146.         public String getExtendedContent()
  147.         {
  148.             return this.content;
  149.         }
  150.  
  151.         public void setUploadDate(Calendar uploadDate)
  152.         {
  153.             this.uploadDate = uploadDate;
  154.         }
  155.  
  156.         public Calendar getUploadDate()
  157.         {
  158.             return this.uploadDate;
  159.         }
  160.  
  161.         public void setLanguage(String language)
  162.         {
  163.             this.language = language;
  164.         }
  165.  
  166.         public String getLanguage()
  167.         {
  168.             return this.language;
  169.         }
  170.  
  171.     }
  172.  
  173.     private static class ProfileBug {
  174.     }
  175.  
  176.     private static class ElasticSearchIndexerBug {
  177.         private static final String CONTENT_FIELD = "content";
  178.         private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchIndexerBug.class);
  179.         private int MAX_LIST_SIZE = 1000;
  180.         private int REPEAT_INTERVAL = 1000;
  181.         private static final long EXPIRE_DATE = 20 * 60 * 1000;
  182.         private static final String POSTS_INDEX = "posts";
  183.         private org.elasticsearch.node.Node node;
  184.         private Client client;
  185.         private Timer timer;
  186.         private List<Entry<PostBug, ProfileBug>> postIndex;
  187.  
  188.         public ElasticSearchIndexerBug()
  189.         {
  190.             postIndex = Collections.synchronizedList(new ArrayList<Entry<PostBug, ProfileBug>>());
  191.             this.node = NodeBuilder.nodeBuilder().node();
  192.             this.client = this.node.client();
  193.             this.timer = new Timer();
  194.             this.timer.schedule(new IndexerTask(this), REPEAT_INTERVAL, REPEAT_INTERVAL);
  195.         }
  196.  
  197.         public PostBug addIndex(PostBug post)
  198.         {
  199.             LOG.debug("Indexing new post {} without profile", "api");
  200.  
  201.             try
  202.             {
  203.                 synchronized (postIndex)
  204.                 {
  205.                     Map.Entry<PostBug, ProfileBug> entry = new AbstractMap.SimpleEntry<PostBug, ProfileBug>(post, null);
  206.                     postIndex.add(entry);
  207.                 }
  208.  
  209.                 if (postIndex.size() > MAX_LIST_SIZE)
  210.                 {
  211.                     updatePostIndexes();
  212.                 }
  213.             }
  214.             catch (Exception e)
  215.             {
  216.                 LOG.error("Could not index tweet ", e);
  217.                 return null;
  218.             }
  219.  
  220.             return post;
  221.         }
  222.  
  223.         public List<Entry<PostBug, ProfileBug>> search(String query, String source, Long lastCollected, String language)
  224.         {
  225.             LOG.debug("Search in elasticsearch using {} for source {}", query, source);
  226.             LOG.debug("Search in elasticsearch using {} language {}", query, language);
  227.  
  228.             SearchResponse response = null;
  229.             SearchRequestBuilder request = client.prepareSearch(POSTS_INDEX);
  230.  
  231.             try
  232.             {
  233.                 request.setFilter(FilterBuilders.numericRangeFilter("uploadDate").gte(lastCollected));
  234.                 QueryBuilder qb;
  235.  
  236.                 if (language.equals("ALL"))
  237.                 {
  238.                     qb = QueryBuilders.queryString(query).analyzer("standard").defaultField(CONTENT_FIELD).defaultOperator(Operator.AND).phraseSlop(0).allowLeadingWildcard(false);
  239.                 }
  240.                 else
  241.                 {
  242.                     qb = QueryBuilders.filteredQuery(QueryBuilders.queryString(query).analyzer("standard").defaultField(CONTENT_FIELD).defaultOperator(Operator.AND).phraseSlop(0).allowLeadingWildcard(false), FilterBuilders.inFilter("language", language.toString().toUpperCase()));
  243.                 }
  244.  
  245.                 request.setQuery(qb);
  246.                 request.setTypes(source.toString().toUpperCase());
  247.                 request.addSort("uploadDate", SortOrder.ASC);
  248.                 request.setSize(MAX_LIST_SIZE);
  249.                 request.setFrom(0);
  250.                 request.setExplain(true);
  251.  
  252.                 LOG.debug("Request {} ", request);
  253.  
  254.                 // Execute get
  255.                 response = request.execute().actionGet();
  256.             }
  257.             catch (Exception e)
  258.             {
  259.                 LOG.warn("Could not search - {}", e.getMessage());
  260.                 e.printStackTrace();
  261.             }
  262.  
  263.             if (response == null) return new ArrayList<Map.Entry<PostBug, ProfileBug>>(0);
  264.  
  265.             List<Entry<PostBug, ProfileBug>> result = new ArrayList<Map.Entry<PostBug, ProfileBug>>();
  266.  
  267.             return result;
  268.         }
  269.  
  270.         public void updatePostIndexes()
  271.         {
  272.             try
  273.             {
  274.                 expireOldData();
  275.  
  276.                 // Bulk index tweets
  277.                 BulkRequestBuilder bulkRequest = client.prepareBulk();
  278.                 synchronized (postIndex)
  279.                 {
  280.                     if (this.postIndex.isEmpty()) { return; }
  281.  
  282.                     for (Entry<PostBug, ProfileBug> entry : postIndex)
  283.                     {
  284.                         Gson postGson = new Gson();
  285.                         Gson profileGson = new Gson();
  286.  
  287.                         XContentBuilder content = XContentFactory.jsonBuilder().startObject();
  288.                         content.field(CONTENT_FIELD, entry.getKey().getExtendedContent());
  289.                         content.field("post", postGson.toJson(entry.getKey()));
  290.                         content.field("profile", profileGson.toJson(entry.getValue()));
  291.                         if (entry.getKey().getLanguage() == null)
  292.                         {
  293.                             // language not known
  294.                             content.field("language", "ALL");
  295.                         }
  296.                         else
  297.                         {
  298.                             content.field("language", entry.getKey().getLanguage().toString().toUpperCase());
  299.                         }
  300.                         content.field("uploadDate", entry.getKey().getUploadDate().getTimeInMillis());
  301.                         content.endObject();
  302.  
  303.                         IndexRequestBuilder request = client.prepareIndex(POSTS_INDEX, "TWITTER");
  304.                         request.setSource(content);
  305.  
  306.                         bulkRequest.add(request);
  307.                     }
  308.                     postIndex.clear();
  309.                 }
  310.  
  311.                 BulkResponse bulkResponse = bulkRequest.execute().actionGet();
  312.  
  313.                 if (bulkResponse.hasFailures())
  314.                 {
  315.                     LOG.warn("Couldnt index all posts {}", bulkResponse.buildFailureMessage());
  316.                 }
  317.                 client.close();
  318.             }
  319.             catch (Exception e)
  320.             {
  321.                 LOG.error("Some error occured while trying to update indexes", e);
  322.             }
  323.         }
  324.  
  325.         public void expireOldData()
  326.         {
  327.             Calendar oldDate = Calendar.getInstance();
  328.             oldDate.setTimeInMillis(oldDate.getTimeInMillis() - EXPIRE_DATE);
  329.  
  330.             try
  331.             {
  332.                 DeleteByQueryRequestBuilder request = client.prepareDeleteByQuery(POSTS_INDEX);
  333.                 request.setQuery(QueryBuilders.rangeQuery("uploadDate").lte(oldDate.getTimeInMillis()));
  334.                 request.execute().actionGet();
  335.             }
  336.             catch (Exception e)
  337.             {
  338.                 LOG.warn("Could NOT expire old tweets {} ", e.getMessage());
  339.             }
  340.  
  341.         }
  342.  
  343.         private class IndexerTask
  344.             extends TimerTask {
  345.             private ElasticSearchIndexerBug indexer;
  346.  
  347.             public IndexerTask(ElasticSearchIndexerBug elasticSearchIndexerBug)
  348.             {
  349.                 this.indexer = elasticSearchIndexerBug;
  350.             }
  351.  
  352.             @Override
  353.             public void run()
  354.             {
  355.                 indexer.updatePostIndexes();
  356.             }
  357.         }
  358.     }
  359.  
  360. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement