Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ****.elasticsearch.index;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.atomic.AtomicInteger;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.elasticsearch.action.ActionListener;
- import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
- import org.elasticsearch.action.bulk.BulkResponse;
- import org.elasticsearch.action.delete.DeleteResponse;
- import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
- import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
- import org.elasticsearch.action.get.GetRequest;
- import org.elasticsearch.action.get.GetResponse;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.action.index.IndexResponse;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.Requests;
- import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
- import org.elasticsearch.client.action.delete.DeleteRequestBuilder;
- import org.elasticsearch.client.action.get.GetRequestBuilder;
- import org.elasticsearch.client.action.search.SearchRequestBuilder;
- import org.elasticsearch.index.query.MatchAllQueryBuilder;
- import org.elasticsearch.index.query.QueryStringQueryBuilder;
- import org.elasticsearch.search.SearchHit;
- import org.elasticsearch.search.SearchHits;
- import org.springframework.beans.factory.annotation.Autowired;
- import ****.index.bean.JsonBean;
- import ****.index.compass.IIndexing;
- import ****.index.jdbc.BuildIndex;
- import ****.util.CSUtils;
- public abstract class BaseESIndexer implements IIndexing<JsonBean>{
- private final static Log LOG = LogFactory.getLog(BaseESIndexer.class);
- @Autowired
- protected ESNodeWrapper esNode;
- protected String indexName = null;
- protected int bulkSize = 100;
- protected int dropThreshold = 10;
- protected final AtomicInteger onGoingBulks = new AtomicInteger();
- protected int bulkOperations = 0;
- /**
- * This method should be used to initialize the Indexer once the ESNodeWrapper has been added.
- * @throws IOException
- */
- protected void init() {
- // new indexes should only be created by (nightly, weekly) indexing processes
- /* try {
- OpenIndexResponse resp = esNode.getClient()
- .admin()
- .indices().prepareOpen(indexName).execute().actionGet();
- } catch (ElasticSearchException e) {
- LOG.info("Error opening Index: " + indexName + ", will now create that index.");
- e.printStackTrace();
- // create index if one is not already present.
- ESAdmin.createIndex(esNode, indexName);
- } */
- }
- public void clear() {
- DeleteByQueryResponse deleteResp = esNode.getClient().prepareDeleteByQuery(indexName)
- .setQuery(new MatchAllQueryBuilder())
- .execute()
- .actionGet();
- }
- public Map load(Integer contactId) {
- String queryStr = "id:" + contactId;
- QueryStringQueryBuilder queryBuilder = new QueryStringQueryBuilder(queryStr);
- SearchRequestBuilder searchReq = esNode.getClient().prepareSearch().setIndices(this.getIndexName())
- .setQuery(queryBuilder);
- SearchResponse searchResponse = searchReq.execute().actionGet();
- SearchHit[] hits = searchResponse.getHits().getHits();
- if (hits == null || hits.length == 0){
- return null;
- }
- Map<String, Object> result = hits[0].getSource();
- return result;
- }
- public void remove(int id) {
- LOG.info("Starting to remove id[" + id + "]");
- String queryStr = "id:" + id;
- QueryStringQueryBuilder queryBuilder = new QueryStringQueryBuilder(queryStr);
- DeleteByQueryResponse deleteResp = esNode.getClient().prepareDeleteByQuery(indexName)
- .setQuery(queryBuilder)
- .execute().actionGet();
- Map<String, IndexDeleteByQueryResponse> indexMap = deleteResp.indices();
- if (indexMap == null){
- return;
- }
- for (String index : indexMap.keySet()){
- IndexDeleteByQueryResponse resp = indexMap.get(index);
- if (resp.failedShards() > 0){
- LOG.warn("Error deleting id[" + id +"]");
- }
- }
- LOG.info("Finished removing id[" + id + "]");
- }
- public void refresh() {
- RefreshRequest request = new RefreshRequest(indexName);
- esNode.getClient().admin().indices().refresh(request ).actionGet();
- }
- /**
- * This should only be used for tens or at most hundreds of deletes.
- * For more than this use removeBatchBulk.
- */
- public void removeBatch(Collection<Integer> ids) {
- for (int id : ids){
- Map indexObject = load(id);
- if (indexObject != null){
- remove(id);
- }
- }
- }
- public void removeBatchBulk(Collection<Integer> ids) {
- LOG.info("Starting removeBatch of " + ids.size() + " objects on " + getIndexName());
- BulkRequestBuilder bulkOperation = esNode.getClient().prepareBulk();
- for (int id : ids){
- Map indexObject = load(id);
- if (indexObject != null){
- String type = null;// ????? where to get this in a delete operation !!!
- Boolean createNew = bulkDelete(id, bulkOperation, type);
- if (createNew){
- bulkOperation = esNode.getClient().prepareBulk();
- }
- }
- }
- finishBulk(bulkOperation, "removeBatch");
- LOG.info("Finished removeBatch of " + ids.size() + " objects on " + getIndexName());
- }
- public void saveBatch(Collection<JsonBean> beans) {
- LOG.info("Starting saveBatch Indexing " + beans.size() + " " + getIndexName() + ", documents.");
- BulkRequestBuilder bulkOperation = esNode.getClient().prepareBulk();
- for (JsonBean jsonBean: beans){
- Boolean createNew = bulkSave(jsonBean, bulkOperation);
- if (createNew){
- bulkOperation = esNode.getClient().prepareBulk();
- }
- }
- // index the stragglers remaining
- finishBulk(bulkOperation, "saveBatch");
- LOG.info("Finished saveBatch indexing threads " + beans.size() + " " + getIndexName() + ", documents.");
- }
- private void finishBulk(BulkRequestBuilder bulkOperation, String desc) {
- int oldBulkSize = bulkSize;
- bulkSize = 1;
- LOG.info(desc + " Index remaining " + bulkOperation.numberOfActions() + " " + getIndexName() + ", operations.");
- processBulkIfNeeded(bulkOperation, bulkOperation.numberOfActions() + bulkOperations);
- bulkSize = oldBulkSize;
- }
- public void save(JsonBean party) {
- IndexResponse postResponse = esNode.getClient().prepareIndex()
- .setIndex(indexName)
- .setType(party.getIndexType())
- .setId(""+party.getId())
- .setSource(party.getJson())
- .setRefresh(true)
- .execute().actionGet();
- }
- public void saveOrUpdateBatch(Collection<JsonBean> parties) {
- LOG.info("Starting saveOrUpdateBatch Indexing of" + parties.size() + " " + getIndexName() + ", documents.");
- BulkRequestBuilder bulkOperation = esNode.getClient().prepareBulk();
- for (JsonBean jsonBean: parties){
- GetResponse getResponse = esNode.getClient()
- .prepareGet(jsonBean.getIndexName(), jsonBean.getIndexType(), "" + jsonBean.getId())
- .execute().actionGet();
- if (getResponse != null){
- Boolean createNew = bulkDelete(jsonBean, bulkOperation);
- if (createNew){
- bulkOperation = esNode.getClient().prepareBulk();
- }
- }
- Boolean createNew = bulkSave(jsonBean, bulkOperation);
- if (createNew){
- bulkOperation = esNode.getClient().prepareBulk();
- }
- }
- // index the stragglers remaining
- int oldBulkSize = bulkSize;
- bulkSize = 1;
- LOG.info("saveOrUpdateBatch Index remaining " + bulkOperation.numberOfActions() + " " + getIndexName() + ", operations.");
- processBulkIfNeeded(bulkOperation, bulkOperation.numberOfActions() + bulkOperations);
- bulkSize = oldBulkSize;
- LOG.info("Finished starting indexing threads " + parties.size() + " " + getIndexName() + ", documents.");
- }
- public void saveOrUpdate(JsonBean party) {
- /*CompassSession cmpSess = null;
- CompassTransaction tx = null;
- try
- {
- cmpSess = compass.openSession();
- tx = cmpSess.beginTransaction();
- try{
- cmpSess.load("accountindex", party.getParty_id());
- cmpSess.delete("accountindex", party.getParty_id());
- }catch(CompassException ignore){}
- cmpSess.save((AccountIndex)party);
- tx.commit();
- } catch (CompassException ce) {
- handleException(tx, ce);
- }finally{
- cmpSess.close();
- }*/
- }
- /*@Override
- public CompassHits search(String query){
- CompassSession cmpSess = null;
- CompassHits hits = null;
- if (query!=null){
- try
- {
- cmpSess = compass.openSession();
- hits = cmpSess.find(query);
- } catch (CompassException ce) {
- LOG.error("searchExtParty raises exception: ", ce);
- }finally{
- cmpSess.close();
- }
- }
- return hits;
- }*/
- public List<JsonBean> searchList(String query){
- List<JsonBean> results = new ArrayList<JsonBean>();
- /*CompassSession cmpSess = null;
- if (query!=null){
- try
- {
- cmpSess = compass.openSession();
- CompassHits hits = cmpSess.find(query);
- for (int i = 0; i < hits.getLength(); i++) {
- results.add((AccountIndex) hits.data(i));
- }
- } catch (CompassException ce) {
- LOG.error("searchIoi raises exception: ", ce);
- }finally{
- cmpSess.close();
- }
- }*/
- return results;
- }
- public List<JsonBean> loadAllList() {
- /*ArrayList<String> results = new ArrayList<String>();
- SearchResponse response = esNode.getClient().prepareSearch(indexName).setSearchType(indexType).execute().actionGet();
- SearchHits hitsObj = response.getHits();
- return hitsObj;*/
- throw new UnsupportedOperationException("We don't want to return millions of results in an array list should query through SearchHits API");
- }
- /*@Override
- public CompassHits loadAll() {
- CompassHits results = search("alias:accountindex");
- return results;
- }*/
- public void CUDParties(List<Map<String, Object>> contactsLst) {
- // TODO Auto-generated method stub
- }
- public Boolean bulkDelete(Integer id, BulkRequestBuilder bulkOperation, String type) {
- DeleteRequestBuilder del = esNode.getClient().prepareDelete().setIndex(getIndexName()).setId(id.toString()).setType(type);
- bulkOperation.add(del);
- Boolean createNewBulkOperation = processBulkIfNeeded(bulkOperation, bulkOperations++);
- return createNewBulkOperation;
- }
- public Boolean bulkDelete(JsonBean jsonBean, BulkRequestBuilder bulkOperation) {
- DeleteRequestBuilder del = esNode.getClient().prepareDelete(indexName, jsonBean.getIndexType(), "" + jsonBean.getId());
- bulkOperation.add(del);
- Boolean createNewBulkOperation = processBulkIfNeeded(bulkOperation, bulkOperations++);
- return createNewBulkOperation;
- }
- public Boolean bulkSave(JsonBean jsonBean, BulkRequestBuilder bulkOperation) {
- IndexRequest indexOperation = Requests.indexRequest(indexName)
- .type(jsonBean.getIndexType())
- .id("" + jsonBean.getId())
- .create(false)
- .source(jsonBean.getJson());
- bulkOperation.add(indexOperation);
- Boolean createNewBulkOperation = processBulkIfNeeded(bulkOperation, bulkOperations++);
- return createNewBulkOperation;
- }
- private Boolean processBulkIfNeeded(BulkRequestBuilder bulkOperation, int bulkOperations) {
- Boolean createNewBulkReq = false;
- if (bulkOperation.numberOfActions() >= bulkSize) {
- // execute the bulk operation
- int currentOnGoingBulks = onGoingBulks.incrementAndGet();
- LOG.info("Ongoing Bulks = " + currentOnGoingBulks + ", Index Operations = " + bulkOperations );
- if (currentOnGoingBulks > dropThreshold) {
- // TODO, just wait here!, we can slow down the parsing
- onGoingBulks.decrementAndGet();
- String message = "dropping bulk, " + onGoingBulks + " crossed threshold " + dropThreshold + ", Index Operations = " + bulkOperations;
- LOG.error(message);
- registerError(new Exception(message));
- } else {
- try {
- final int bulkNo = onGoingBulks.get();
- LOG.info("Executing Bulk Request " + bulkNo + " for index " + getIndexName());
- bulkOperation.execute(new ActionListener<BulkResponse>() {
- @Override public void onResponse(BulkResponse bulkResponse) {
- onGoingBulks.decrementAndGet();
- LOG.info("Bulk [" + bulkNo + "] of " + getIndexName() + " Index Executed");
- if (bulkResponse.hasFailures()){
- RuntimeException indexingException = new RuntimeException(bulkResponse.buildFailureMessage());
- registerError(indexingException);
- }
- }
- @Override public void onFailure(Throwable indexingException) {
- Exception exception = new Exception(indexingException);
- registerError(exception);
- }
- });
- } catch (Exception indexingException) {
- registerError(indexingException);
- }
- }
- // once we have executed a bulk request, create a new one for adding
- // a fresh set of bulk updates, deletes, additions etc.
- createNewBulkReq = true;
- }
- return createNewBulkReq;
- }
- private void registerError(Exception indexingException) {
- BuildIndex.statusFlag = BuildIndex.ERROR;
- BuildIndex.exception = indexingException;
- LOG.error("Indexing Exception", indexingException);
- }
- public String getIndexName() {
- return indexName;
- }
- public void setIndexName(String indexName) {
- this.indexName = indexName;
- }
- public int getBulkSize() {
- return bulkSize;
- }
- public void setBulkSize(int bulkSize) {
- this.bulkSize = bulkSize;
- }
- public int getDropThreshold() {
- return dropThreshold;
- }
- public void setDropThreshold(int dropThreshold) {
- this.dropThreshold = dropThreshold;
- }
- public AtomicInteger getOnGoingBulks() {
- return onGoingBulks;
- }
- public ESNodeWrapper getEsNode() {
- return esNode;
- }
- }
- ====================================================================================================================
- package ****.index.bean;
- public class JsonBean {
- private String json;
- private int id;
- private String indexName;
- private String indexType;
- public JsonBean(){}
- public JsonBean(String json, int id, String indexName, String indexType) {
- this.json = json;
- this.id = id;
- this.indexName = indexName;
- this.indexType = indexType;
- }
- public String getJson() {
- return json;
- }
- public void setJson(String json) {
- this.json = json;
- }
- public int getId() {
- return id;
- }
- public void setId(int id) {
- this.id = id;
- }
- public String getIndexName() {
- return indexName;
- }
- public void setIndexName(String indexName) {
- this.indexName = indexName;
- }
- public String getIndexType() {
- return indexType;
- }
- public void setIndexType(String indexType) {
- this.indexType = indexType;
- }
- }
- ===============================================================================================================================
- package ****.elasticsearch.buildIndex;
- import static ****.elasticsearch.util.ESConstants.ALIAS_CONTACT;
- import static ****.elasticsearch.util.ESConstants.INDEX_CONTACT;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.List;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.codehaus.jackson.JsonGenerationException;
- import org.codehaus.jackson.map.JsonMappingException;
- import org.codehaus.jackson.map.ObjectMapper;
- import org.springframework.beans.factory.annotation.Autowired;
- import ****.elasticsearch.index.ESContactIndexer;
- import ****.elasticsearch.index.IndexCleanup;
- import ****.elasticsearch.util.ESAdmin;
- import ****.index.bean.Contact;
- import ****.index.bean.ContactJSONWrapper;
- import ****.index.bean.JsonBean;
- import ****.index.compass.QueuedIndexingThread;
- import ****.index.jdbc.BaseContactJDBCIndexer;
- import ****.index.jdbc.BuildIndex;
- import ****.index.jdbc.IntegerRowMapper;
- import ****.index.jdbc.JDBCIndexer;
- import ****.index.jdbc.JDBCQueries;
- import ****.index.jdbc.JDBCQueryHelper;
- import ****.util.CSUtils;
- public class ESContactJDBCIndexer extends BaseContactJDBCIndexer implements JDBCIndexer, ElasticSearchIndexer{ //JdbcDaoSupport{
- final static Log LOG = LogFactory.getLog(ESContactJDBCIndexer.class);
- QueuedIndexingThread<JsonBean> indexThread = new QueuedIndexingThread<JsonBean>("ESContact Indexer");
- @Autowired(required=true) ESContactIndexer esContactIndexing;
- public void init() {
- }
- public void doIndex() throws Exception {
- IndexCleanup.cleanup(esContactIndexing.getEsNode(), "Contact JDBC");
- Thread.sleep(1000);
- String oldIndexName = getOldIndexName();
- LOG.info("Old Index Name[" + oldIndexName + "]");
- String newIndexName = createNewIndex();
- LOG.info("New Index Name[" + newIndexName + "]");
- esContactIndexing.setIndexName(newIndexName);
- // This is the big data one
- getDBDataAndCreateNewIndex();
- LOG.info("Removing alias [" + ALIAS_CONTACT + "] for [" + oldIndexName + "]");
- removeAliasOldIndex(oldIndexName, ALIAS_CONTACT);
- LOG.info("Adding alias [" + ALIAS_CONTACT + "] for [" + newIndexName + "]");
- aliasNewIndex(newIndexName, ALIAS_CONTACT);
- //?????? runWarmUpQueries ????? // Lucene 3.0.3 (I think) needs one-off warm up queries.
- Thread.sleep(1000);
- LOG.info("Deleting index [" + oldIndexName + "]");
- deleteOldIndex(oldIndexName);
- }
- @Override
- public void removeAliasOldIndex(String oldIndexName, String aliasContact) {
- ESAdmin.removeAlias(esContactIndexing.getEsNode(), oldIndexName, aliasContact);
- }
- @Override
- public void aliasNewIndex(String newIndexName, String aliasContact) {
- ESAdmin.addAlias(esContactIndexing.getEsNode(), newIndexName, aliasContact);
- }
- @Override
- public void deleteOldIndex(String oldIndexName) {
- ESAdmin.deleteIndex(esContactIndexing.getEsNode(), oldIndexName);
- }
- @Override
- public String createNewIndex() {
- return EsJdbcHelper.createNewIndexOfType(INDEX_CONTACT, esContactIndexing.getEsNode());
- }
- @Override
- public String getOldIndexName() {
- return EsJdbcHelper.getOldIndexNameOfType(INDEX_CONTACT, esContactIndexing.getEsNode());
- }
- @Override
- public void getDBDataAndCreateNewIndex() throws Exception, InterruptedException {
- List<Integer> allPersonPartyIds = getPersonIds();
- int size = allPersonPartyIds.size();
- if(size==0) return;
- indexThread.setIndexingTools(esContactIndexing);
- indexThread.setStopWhenEmpty(false);
- indexThread.setUpdateMode(false);
- LOG.info("*** Number of contacts to Index = " + size);
- // set chunk size to give about 5000 documents per chunk
- int numberOfChunks = size / 5000;
- LOG.info("numberOfChunks = " + numberOfChunks);
- List<JDBCQueryHelper.HiLowId> idBetweenList = JDBCQueryHelper.createBetweenIdList(allPersonPartyIds, numberOfChunks);
- int count = 0;
- for (JDBCQueryHelper.HiLowId hiLo: idBetweenList){
- if (BuildIndex.statusFlag == BuildIndex.ERROR){
- LOG.error("Error In Indexbuilding, stopping now.", BuildIndex.exception);
- return;
- }
- Collection<Contact> parties = mapContacts(hiLo);
- indexCollection(parties);
- ESAdmin.getStats(esContactIndexing.getEsNode(), LOG);
- LOG.info("Pass number " + ++count + " executed.");
- }
- Thread.sleep(2000);
- indexThread.setStopWhenEmpty(true);
- try {
- Thread.sleep(5000);
- while (indexThread.isStillRunning()){
- Thread.sleep(5000);
- }
- } catch (InterruptedException e) {}
- }
- public List<Integer> getPersonIds() {
- List <Integer> ids = getJdbcTemplate().query(
- JDBCQueries.GET_ALL_PERSON_IDS_PROC, new IntegerRowMapper());
- return ids;
- }
- public void setExtPartyIndexing(ESContactIndexer contactIndexing) {
- this.esContactIndexing = contactIndexing;
- }
- public void doIndexLimited(String contactIds) throws Exception {
- esContactIndexing.setIndexName(ALIAS_CONTACT);
- LOG.info("Contact Ids[" + contactIds + "]");
- if(contactIds==null || "".equals(contactIds)) {
- LOG.warn("Empty contactIds!! Returning now.");
- return;
- }
- Collection<Contact> parties = mapContactsLimited(contactIds);
- List<JsonBean> jsonList = getCollectionAsJsonBeanList(parties);
- esContactIndexing.saveOrUpdateBatch(jsonList);
- // refresh the index before returning, we do it TWICE deliberately
- esContactIndexing.refresh();
- esContactIndexing.refresh();
- }
- void indexCollection(Collection<Contact> parties) throws Exception {
- List<JsonBean> jsonList = getCollectionAsJsonBeanList(parties);
- LOG.info("created " + jsonList.size() + " contact json beans.");
- indexJSONCollection(jsonList);
- }
- private List<JsonBean> getCollectionAsJsonBeanList(
- Collection<Contact> parties) throws IOException,
- JsonGenerationException, JsonMappingException {
- List<JsonBean> jsonList = new ArrayList<JsonBean>(parties.size());
- int i = 0;
- for (Contact contact : parties){
- ContactJSONWrapper wrapperForLessVerboseJson = new ContactJSONWrapper(contact);
- String json1 = CSUtils.mapper().writeValueAsString(wrapperForLessVerboseJson);
- if (json1 != null){
- if (i == 0){
- LOG.info("contact json = " + json1);
- i++;
- }
- JsonBean jsonBean = new JsonBean(json1, contact.getPartyId(), esContactIndexing.getIndexName() , contact.getType());
- jsonList.add(jsonBean);
- }
- }
- return jsonList;
- }
- private void indexJSONCollection(Collection<JsonBean> parties) {
- indexThread.addToIndexQueAndIncrementBatchNumber(parties);
- if (!indexThread.isStarted()){
- indexThread.start();
- }
- }
- }
- ==============================================================================================================================
- package ****.index.compass;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.concurrent.atomic.AtomicInteger;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- public class QueuedIndexingThread<T> extends Thread {
- private String description;
- /**
- * Use this if you know how many batches you need to index at instatiation
- * time. - use "addToIndexQueAndIncrementBatchNumber" when adding 'chunks'
- * to index. - do not use "addToIndexQue"
- *
- * - also this way seems to be slower than using predefined chunk size,
- * pehaps because we are changing variable values in the middle of a thread
- * causing more context switching.
- *
- * @param numberOfBatchesToIndex
- */
- public QueuedIndexingThread(String description) {
- this.numberOfBatchesToIndex = 0;
- this.isPreConfiguredChunkSize = false;
- this.description = description;
- }
- /**
- * Use this if you know how many batches you need to index at instatiation
- * time. - use "addToIndexQue" when adding 'chunks' to index. - do not use
- * not "addToIndexQueAndIncrementBatchNumber"
- *
- * @param numberOfBatchesToIndex
- */
- public QueuedIndexingThread(int numberOfBatchesToIndex, String description) {
- this.numberOfBatchesToIndex = numberOfBatchesToIndex;
- this.isPreConfiguredChunkSize = true;
- this.description = description;
- }
- private final static Log LOG = LogFactory.getLog(QueuedIndexingThread.class);
- private IIndexing<T> indexing;
- int numberOfBatchesToIndex = 0;
- private boolean isStarted = false;
- private boolean doNotStop = true;
- private boolean stopWhenEmpty = true;
- private Boolean isPreConfiguredChunkSize;
- private Boolean updateMode;
- private int countIndexesFinished;
- public Boolean getUpdateMode() {
- return updateMode;
- }
- public void setUpdateMode(Boolean updateMode) {
- this.updateMode = updateMode;
- }
- private LinkedList<Collection<T>> que = new LinkedList<Collection<T>>();
- @Override
- public void run() {
- isStarted = true;
- LOG.info("Starting Indexing" + description);
- while (doNotStop) {
- try {
- Collection<T> indexAble = que.poll();
- if ( indexAble != null) {
- LOG.info( String.format( "Starting Indexing [ %s ] Batch number [ %s ]", description, ++countIndexesFinished ) );
- if (updateMode) {
- indexing.saveOrUpdateBatch(indexAble);
- } else {
- // do not do update when indexing re-index by indexing an new index and deleting the
- // old one then alias the new index to a constant alias name.
- indexing.saveBatch(indexAble);
- }
- indexAble.clear();
- indexAble = null;
- LOG.info( String.format( "Finished Indexing [ %s ] Batch number [ %s ]", description, countIndexesFinished ) );
- }
- Thread.sleep(1000);
- doNotStop = (countIndexesFinished < numberOfBatchesToIndex) || !stopWhenEmpty || que.size() > 0;
- } catch (InterruptedException ex) {
- throw new RuntimeException(ex);
- }
- }
- LOG.info("Shutting down Indexer" + this);
- try {
- Thread.sleep(25000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public void addToIndexQue(Collection<T> indexableChunk) {
- if (!isPreConfiguredChunkSize) {
- throw new IllegalStateException("If you have not pre-defined your chunk size use 'addToIndexQueAndIncrementBatchNumber' to add chunks");
- }
- que.add(indexableChunk);
- }
- /**
- *
- * Note - this way seems to be slower than using predefined chunk size,
- * pehaps because we are changing variable values in the middle of a thread
- * causing more context switching.
- *
- * @param indexableChunk
- */
- public void addToIndexQueAndIncrementBatchNumber(Collection<T> indexableChunk) {
- if (isPreConfiguredChunkSize) {
- throw new IllegalStateException( "If you have pre-defined your chunk size use 'addToIndexQue' to add chunks");
- }
- this.numberOfBatchesToIndex++;
- que.add(indexableChunk);
- LOG.info(String.format(" ADDED BATCH [ %s ] ", this.numberOfBatchesToIndex) );
- }
- public boolean isStarted() {
- return isStarted;
- }
- public void setIndexingTools(IIndexing<T> indexing) {
- this.indexing = indexing;
- }
- public boolean isStillRunning() {
- return doNotStop;
- }
- public String stopStatus() {
- return
- String.format( "doNotStop [ %s ] countIndexesFinished < numberOfBatchesToIndex [ %s ], !stopWhenEmpty [ %s ], que.size [ %s ]",
- doNotStop, (countIndexesFinished < numberOfBatchesToIndex), !stopWhenEmpty, que.size() );
- }
- /**
- * Set this to false to keep the Thread going because we expect more data to
- * be added later. - once all data has been added you must manually set this
- * back to true or the thread will continue for ever.
- *
- * @param stopWhenEmpty
- */
- public void setStopWhenEmpty(boolean stopWhenEmpty) {
- this.stopWhenEmpty = stopWhenEmpty;
- }
- }
- ======================================================================================================
Add Comment
Please, Sign In to add comment