Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main;
- import java.util.AbstractMap;
- import java.util.ArrayList;
- import java.util.Calendar;
- import java.util.Collections;
- import java.util.Date;
- import java.util.List;
- import java.util.Map;
- import java.util.Random;
- import java.util.Timer;
- import java.util.TimerTask;
- import java.util.Map.Entry;
- import org.elasticsearch.action.bulk.BulkRequestBuilder;
- import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
- import org.elasticsearch.action.index.IndexRequestBuilder;
- import org.elasticsearch.action.search.SearchRequestBuilder;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.Client;
- import org.elasticsearch.common.xcontent.XContentBuilder;
- import org.elasticsearch.common.xcontent.XContentFactory;
- import org.elasticsearch.index.query.FilterBuilders;
- import org.elasticsearch.index.query.QueryBuilder;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.index.query.QueryStringQueryBuilder.Operator;
- import org.elasticsearch.node.NodeBuilder;
- import org.elasticsearch.search.sort.SortOrder;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.google.gson.Gson;
- public class ElasticSearchBug {
- private static String charset = "abcdefghijklmnopqrstuvwxyz";
- public static String word()
- {
- Random rand = new Random();
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 4; i++)
- {
- int c = rand.nextInt(charset.length());
- sb.append(charset.charAt(c));
- }
- return sb.toString();
- }
- public static String content()
- {
- Random rand = new Random();
- int s = rand.nextInt(30);
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < s; i++)
- {
- sb.append(word());
- }
- return sb.toString();
- }
- public static void main(String[] args) throws InterruptedException
- {
- ElasticSearchIndexerBug es = new ElasticSearchIndexerBug();
- SearchTask st0 = new SearchTask(es);
- InsertTask it0 = new InsertTask(es);
- st0.run();
- it0.run();
- st0.join();
- it0.join();
- }
- private static class SearchTask
- extends Thread {
- private ElasticSearchIndexerBug es;
- private Date date = new Date();
- public SearchTask(ElasticSearchIndexerBug es)
- {
- this.es = es;
- }
- @Override
- public void run()
- {
- for (int i = 0; i < 1000; i++)
- {
- es.search(word(), "TWITTER", date.getTime(), "EN");
- try
- {
- sleep(10);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- }
- }
- private static class InsertTask
- extends Thread {
- private ElasticSearchIndexerBug es;
- public InsertTask(ElasticSearchIndexerBug es)
- {
- this.es = es;
- }
- @Override
- public void run()
- {
- for (int i = 0; i < 500; i++)
- {
- PostBug p = new PostBug();
- p.setContent(content());
- p.setUploadDate(Calendar.getInstance());
- p.setLanguage("EN");
- es.addIndex(p);
- try
- {
- sleep(50);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- }
- }
- private static class PostBug {
- private String content;
- private Calendar uploadDate;
- private String language;
- public void setContent(String content)
- {
- this.content = content;
- }
- public String getExtendedContent()
- {
- return this.content;
- }
- public void setUploadDate(Calendar uploadDate)
- {
- this.uploadDate = uploadDate;
- }
- public Calendar getUploadDate()
- {
- return this.uploadDate;
- }
- public void setLanguage(String language)
- {
- this.language = language;
- }
- public String getLanguage()
- {
- return this.language;
- }
- }
- private static class ProfileBug {
- }
- private static class ElasticSearchIndexerBug {
- private static final String CONTENT_FIELD = "content";
- private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchIndexerBug.class);
- private int MAX_LIST_SIZE = 1000;
- private int REPEAT_INTERVAL = 1000;
- private static final long EXPIRE_DATE = 20 * 60 * 1000;
- private static final String POSTS_INDEX = "posts";
- private org.elasticsearch.node.Node node;
- private Client client;
- private Timer timer;
- private List<Entry<PostBug, ProfileBug>> postIndex;
- public ElasticSearchIndexerBug()
- {
- postIndex = Collections.synchronizedList(new ArrayList<Entry<PostBug, ProfileBug>>());
- this.node = NodeBuilder.nodeBuilder().node();
- this.client = this.node.client();
- this.timer = new Timer();
- this.timer.schedule(new IndexerTask(this), REPEAT_INTERVAL, REPEAT_INTERVAL);
- }
- public PostBug addIndex(PostBug post)
- {
- LOG.debug("Indexing new post {} without profile", "api");
- try
- {
- synchronized (postIndex)
- {
- Map.Entry<PostBug, ProfileBug> entry = new AbstractMap.SimpleEntry<PostBug, ProfileBug>(post, null);
- postIndex.add(entry);
- }
- if (postIndex.size() > MAX_LIST_SIZE)
- {
- updatePostIndexes();
- }
- }
- catch (Exception e)
- {
- LOG.error("Could not index tweet ", e);
- return null;
- }
- return post;
- }
- public List<Entry<PostBug, ProfileBug>> search(String query, String source, Long lastCollected, String language)
- {
- LOG.debug("Search in elasticsearch using {} for source {}", query, source);
- LOG.debug("Search in elasticsearch using {} language {}", query, language);
- SearchResponse response = null;
- SearchRequestBuilder request = client.prepareSearch(POSTS_INDEX);
- try
- {
- request.setFilter(FilterBuilders.numericRangeFilter("uploadDate").gte(lastCollected));
- QueryBuilder qb;
- if (language.equals("ALL"))
- {
- qb = QueryBuilders.queryString(query).analyzer("standard").defaultField(CONTENT_FIELD).defaultOperator(Operator.AND).phraseSlop(0).allowLeadingWildcard(false);
- }
- else
- {
- qb = QueryBuilders.filteredQuery(QueryBuilders.queryString(query).analyzer("standard").defaultField(CONTENT_FIELD).defaultOperator(Operator.AND).phraseSlop(0).allowLeadingWildcard(false), FilterBuilders.inFilter("language", language.toString().toUpperCase()));
- }
- request.setQuery(qb);
- request.setTypes(source.toString().toUpperCase());
- request.addSort("uploadDate", SortOrder.ASC);
- request.setSize(MAX_LIST_SIZE);
- request.setFrom(0);
- request.setExplain(true);
- LOG.debug("Request {} ", request);
- // Execute get
- response = request.execute().actionGet();
- }
- catch (Exception e)
- {
- LOG.warn("Could not search - {}", e.getMessage());
- e.printStackTrace();
- }
- if (response == null) return new ArrayList<Map.Entry<PostBug, ProfileBug>>(0);
- List<Entry<PostBug, ProfileBug>> result = new ArrayList<Map.Entry<PostBug, ProfileBug>>();
- return result;
- }
- public void updatePostIndexes()
- {
- try
- {
- expireOldData();
- // Bulk index tweets
- BulkRequestBuilder bulkRequest = client.prepareBulk();
- synchronized (postIndex)
- {
- if (this.postIndex.isEmpty()) { return; }
- for (Entry<PostBug, ProfileBug> entry : postIndex)
- {
- Gson postGson = new Gson();
- Gson profileGson = new Gson();
- XContentBuilder content = XContentFactory.jsonBuilder().startObject();
- content.field(CONTENT_FIELD, entry.getKey().getExtendedContent());
- content.field("post", postGson.toJson(entry.getKey()));
- content.field("profile", profileGson.toJson(entry.getValue()));
- if (entry.getKey().getLanguage() == null)
- {
- // language not known
- content.field("language", "ALL");
- }
- else
- {
- content.field("language", entry.getKey().getLanguage().toString().toUpperCase());
- }
- content.field("uploadDate", entry.getKey().getUploadDate().getTimeInMillis());
- content.endObject();
- IndexRequestBuilder request = client.prepareIndex(POSTS_INDEX, "TWITTER");
- request.setSource(content);
- bulkRequest.add(request);
- }
- postIndex.clear();
- }
- BulkResponse bulkResponse = bulkRequest.execute().actionGet();
- if (bulkResponse.hasFailures())
- {
- LOG.warn("Couldnt index all posts {}", bulkResponse.buildFailureMessage());
- }
- client.close();
- }
- catch (Exception e)
- {
- LOG.error("Some error occured while trying to update indexes", e);
- }
- }
- public void expireOldData()
- {
- Calendar oldDate = Calendar.getInstance();
- oldDate.setTimeInMillis(oldDate.getTimeInMillis() - EXPIRE_DATE);
- try
- {
- DeleteByQueryRequestBuilder request = client.prepareDeleteByQuery(POSTS_INDEX);
- request.setQuery(QueryBuilders.rangeQuery("uploadDate").lte(oldDate.getTimeInMillis()));
- request.execute().actionGet();
- }
- catch (Exception e)
- {
- LOG.warn("Could NOT expire old tweets {} ", e.getMessage());
- }
- }
- private class IndexerTask
- extends TimerTask {
- private ElasticSearchIndexerBug indexer;
- public IndexerTask(ElasticSearchIndexerBug elasticSearchIndexerBug)
- {
- this.indexer = elasticSearchIndexerBug;
- }
- @Override
- public void run()
- {
- indexer.updatePostIndexes();
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement