Guest User

Untitled

a guest
Oct 19th, 2018
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 26.42 KB | None | 0 0
  1. package ****.elasticsearch.index;
  2.  
  3. import java.io.IOException;
  4. import java.util.ArrayList;
  5. import java.util.Collection;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9.  
  10. import org.apache.commons.logging.Log;
  11. import org.apache.commons.logging.LogFactory;
  12. import org.elasticsearch.action.ActionListener;
  13. import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
  14. import org.elasticsearch.action.bulk.BulkResponse;
  15. import org.elasticsearch.action.delete.DeleteResponse;
  16. import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
  17. import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
  18. import org.elasticsearch.action.get.GetRequest;
  19. import org.elasticsearch.action.get.GetResponse;
  20. import org.elasticsearch.action.index.IndexRequest;
  21. import org.elasticsearch.action.index.IndexResponse;
  22. import org.elasticsearch.action.search.SearchResponse;
  23. import org.elasticsearch.client.Requests;
  24. import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
  25. import org.elasticsearch.client.action.delete.DeleteRequestBuilder;
  26. import org.elasticsearch.client.action.get.GetRequestBuilder;
  27. import org.elasticsearch.client.action.search.SearchRequestBuilder;
  28. import org.elasticsearch.index.query.MatchAllQueryBuilder;
  29. import org.elasticsearch.index.query.QueryStringQueryBuilder;
  30. import org.elasticsearch.search.SearchHit;
  31. import org.elasticsearch.search.SearchHits;
  32. import org.springframework.beans.factory.annotation.Autowired;
  33.  
  34. import ****.index.bean.JsonBean;
  35. import ****.index.compass.IIndexing;
  36. import ****.index.jdbc.BuildIndex;
  37. import ****.util.CSUtils;
  38.  
  39. public abstract class BaseESIndexer implements IIndexing<JsonBean>{
  40.  
  41. private final static Log LOG = LogFactory.getLog(BaseESIndexer.class);
  42.  
  43. @Autowired
  44. protected ESNodeWrapper esNode;
  45.  
  46. protected String indexName = null;
  47.  
  48. protected int bulkSize = 100;
  49. protected int dropThreshold = 10;
  50. protected final AtomicInteger onGoingBulks = new AtomicInteger();
  51.  
  52. protected int bulkOperations = 0;
  53.  
  54. /**
  55. * This method should be used to initialize the Indexer once the ESNodeWrapper has been added.
  56. * @throws IOException
  57. */
  58. protected void init() {
  59.  
  60. // new indexes should only be created by (nightly, weekly) indexing processes
  61.  
  62. /* try {
  63. OpenIndexResponse resp = esNode.getClient()
  64. .admin()
  65. .indices().prepareOpen(indexName).execute().actionGet();
  66. } catch (ElasticSearchException e) {
  67. LOG.info("Error opening Index: " + indexName + ", will now create that index.");
  68. e.printStackTrace();
  69. // create index if one is not already present.
  70. ESAdmin.createIndex(esNode, indexName);
  71. } */
  72.  
  73. }
  74.  
  75. public void clear() {
  76.  
  77. DeleteByQueryResponse deleteResp = esNode.getClient().prepareDeleteByQuery(indexName)
  78. .setQuery(new MatchAllQueryBuilder())
  79. .execute()
  80. .actionGet();
  81. }
  82.  
  83. public Map load(Integer contactId) {
  84. String queryStr = "id:" + contactId;
  85. QueryStringQueryBuilder queryBuilder = new QueryStringQueryBuilder(queryStr);
  86. SearchRequestBuilder searchReq = esNode.getClient().prepareSearch().setIndices(this.getIndexName())
  87. .setQuery(queryBuilder);
  88.  
  89. SearchResponse searchResponse = searchReq.execute().actionGet();
  90.  
  91. SearchHit[] hits = searchResponse.getHits().getHits();
  92.  
  93. if (hits == null || hits.length == 0){
  94. return null;
  95. }
  96.  
  97. Map<String, Object> result = hits[0].getSource();
  98.  
  99. return result;
  100. }
  101.  
  102.  
  103. public void remove(int id) {
  104. LOG.info("Starting to remove id[" + id + "]");
  105.  
  106. String queryStr = "id:" + id;
  107. QueryStringQueryBuilder queryBuilder = new QueryStringQueryBuilder(queryStr);
  108.  
  109. DeleteByQueryResponse deleteResp = esNode.getClient().prepareDeleteByQuery(indexName)
  110. .setQuery(queryBuilder)
  111. .execute().actionGet();
  112.  
  113. Map<String, IndexDeleteByQueryResponse> indexMap = deleteResp.indices();
  114. if (indexMap == null){
  115. return;
  116. }
  117. for (String index : indexMap.keySet()){
  118. IndexDeleteByQueryResponse resp = indexMap.get(index);
  119. if (resp.failedShards() > 0){
  120. LOG.warn("Error deleting id[" + id +"]");
  121. }
  122. }
  123.  
  124. LOG.info("Finished removing id[" + id + "]");
  125.  
  126. }
  127.  
  128. public void refresh() {
  129. RefreshRequest request = new RefreshRequest(indexName);
  130. esNode.getClient().admin().indices().refresh(request ).actionGet();
  131. }
  132.  
  133.  
  134. /**
  135. * This should only be used for tens or at most hundreds of deletes.
  136. * For more than this use removeBatchBulk.
  137. */
  138. public void removeBatch(Collection<Integer> ids) {
  139.  
  140. for (int id : ids){
  141. Map indexObject = load(id);
  142. if (indexObject != null){
  143. remove(id);
  144. }
  145. }
  146.  
  147. }
  148.  
  149. public void removeBatchBulk(Collection<Integer> ids) {
  150.  
  151. LOG.info("Starting removeBatch of " + ids.size() + " objects on " + getIndexName());
  152.  
  153. BulkRequestBuilder bulkOperation = esNode.getClient().prepareBulk();
  154.  
  155. for (int id : ids){
  156. Map indexObject = load(id);
  157. if (indexObject != null){
  158. String type = null;// ????? where to get this in a delete operation !!!
  159. Boolean createNew = bulkDelete(id, bulkOperation, type);
  160. if (createNew){
  161. bulkOperation = esNode.getClient().prepareBulk();
  162. }
  163. }
  164. }
  165.  
  166. finishBulk(bulkOperation, "removeBatch");
  167. LOG.info("Finished removeBatch of " + ids.size() + " objects on " + getIndexName());
  168. }
  169.  
  170.  
  171. public void saveBatch(Collection<JsonBean> beans) {
  172.  
  173. LOG.info("Starting saveBatch Indexing " + beans.size() + " " + getIndexName() + ", documents.");
  174.  
  175. BulkRequestBuilder bulkOperation = esNode.getClient().prepareBulk();
  176.  
  177. for (JsonBean jsonBean: beans){
  178. Boolean createNew = bulkSave(jsonBean, bulkOperation);
  179. if (createNew){
  180. bulkOperation = esNode.getClient().prepareBulk();
  181. }
  182. }
  183. // index the stragglers remaining
  184. finishBulk(bulkOperation, "saveBatch");
  185.  
  186. LOG.info("Finished saveBatch indexing threads " + beans.size() + " " + getIndexName() + ", documents.");
  187. }
  188.  
  189. private void finishBulk(BulkRequestBuilder bulkOperation, String desc) {
  190. int oldBulkSize = bulkSize;
  191. bulkSize = 1;
  192. LOG.info(desc + " Index remaining " + bulkOperation.numberOfActions() + " " + getIndexName() + ", operations.");
  193. processBulkIfNeeded(bulkOperation, bulkOperation.numberOfActions() + bulkOperations);
  194. bulkSize = oldBulkSize;
  195. }
  196.  
  197.  
  198. public void save(JsonBean party) {
  199.  
  200. IndexResponse postResponse = esNode.getClient().prepareIndex()
  201. .setIndex(indexName)
  202. .setType(party.getIndexType())
  203. .setId(""+party.getId())
  204. .setSource(party.getJson())
  205. .setRefresh(true)
  206. .execute().actionGet();
  207. }
  208.  
  209.  
  210. public void saveOrUpdateBatch(Collection<JsonBean> parties) {
  211.  
  212. LOG.info("Starting saveOrUpdateBatch Indexing of" + parties.size() + " " + getIndexName() + ", documents.");
  213.  
  214. BulkRequestBuilder bulkOperation = esNode.getClient().prepareBulk();
  215.  
  216. for (JsonBean jsonBean: parties){
  217. GetResponse getResponse = esNode.getClient()
  218. .prepareGet(jsonBean.getIndexName(), jsonBean.getIndexType(), "" + jsonBean.getId())
  219. .execute().actionGet();
  220. if (getResponse != null){
  221. Boolean createNew = bulkDelete(jsonBean, bulkOperation);
  222. if (createNew){
  223. bulkOperation = esNode.getClient().prepareBulk();
  224. }
  225. }
  226. Boolean createNew = bulkSave(jsonBean, bulkOperation);
  227. if (createNew){
  228. bulkOperation = esNode.getClient().prepareBulk();
  229. }
  230. }
  231. // index the stragglers remaining
  232. int oldBulkSize = bulkSize;
  233. bulkSize = 1;
  234. LOG.info("saveOrUpdateBatch Index remaining " + bulkOperation.numberOfActions() + " " + getIndexName() + ", operations.");
  235. processBulkIfNeeded(bulkOperation, bulkOperation.numberOfActions() + bulkOperations);
  236. bulkSize = oldBulkSize;
  237.  
  238. LOG.info("Finished starting indexing threads " + parties.size() + " " + getIndexName() + ", documents.");
  239. }
  240.  
  241.  
  242. public void saveOrUpdate(JsonBean party) {
  243.  
  244.  
  245.  
  246. /*CompassSession cmpSess = null;
  247. CompassTransaction tx = null;
  248. try
  249. {
  250. cmpSess = compass.openSession();
  251. tx = cmpSess.beginTransaction();
  252.  
  253. try{
  254. cmpSess.load("accountindex", party.getParty_id());
  255. cmpSess.delete("accountindex", party.getParty_id());
  256. }catch(CompassException ignore){}
  257. cmpSess.save((AccountIndex)party);
  258. tx.commit();
  259. } catch (CompassException ce) {
  260. handleException(tx, ce);
  261. }finally{
  262. cmpSess.close();
  263. }*/
  264. }
  265.  
  266. /*@Override
  267. public CompassHits search(String query){
  268.  
  269. CompassSession cmpSess = null;
  270. CompassHits hits = null;
  271.  
  272. if (query!=null){
  273. try
  274. {
  275. cmpSess = compass.openSession();
  276. hits = cmpSess.find(query);
  277. } catch (CompassException ce) {
  278. LOG.error("searchExtParty raises exception: ", ce);
  279. }finally{
  280. cmpSess.close();
  281. }
  282. }
  283. return hits;
  284. }*/
  285.  
  286.  
  287. public List<JsonBean> searchList(String query){
  288. List<JsonBean> results = new ArrayList<JsonBean>();
  289. /*CompassSession cmpSess = null;
  290.  
  291. if (query!=null){
  292. try
  293. {
  294. cmpSess = compass.openSession();
  295. CompassHits hits = cmpSess.find(query);
  296. for (int i = 0; i < hits.getLength(); i++) {
  297. results.add((AccountIndex) hits.data(i));
  298. }
  299. } catch (CompassException ce) {
  300. LOG.error("searchIoi raises exception: ", ce);
  301. }finally{
  302. cmpSess.close();
  303. }
  304. }*/
  305.  
  306. return results;
  307. }
  308.  
  309.  
  310. public List<JsonBean> loadAllList() {
  311. /*ArrayList<String> results = new ArrayList<String>();
  312. SearchResponse response = esNode.getClient().prepareSearch(indexName).setSearchType(indexType).execute().actionGet();
  313. SearchHits hitsObj = response.getHits();
  314. return hitsObj;*/
  315.  
  316. throw new UnsupportedOperationException("We don't want to return millions of results in an array list should query through SearchHits API");
  317. }
  318.  
  319. /*@Override
  320. public CompassHits loadAll() {
  321. CompassHits results = search("alias:accountindex");
  322. return results;
  323. }*/
  324.  
  325.  
  326. public void CUDParties(List<Map<String, Object>> contactsLst) {
  327. // TODO Auto-generated method stub
  328.  
  329. }
  330.  
  331. public Boolean bulkDelete(Integer id, BulkRequestBuilder bulkOperation, String type) {
  332.  
  333. DeleteRequestBuilder del = esNode.getClient().prepareDelete().setIndex(getIndexName()).setId(id.toString()).setType(type);
  334. bulkOperation.add(del);
  335. Boolean createNewBulkOperation = processBulkIfNeeded(bulkOperation, bulkOperations++);
  336. return createNewBulkOperation;
  337. }
  338.  
  339. public Boolean bulkDelete(JsonBean jsonBean, BulkRequestBuilder bulkOperation) {
  340.  
  341. DeleteRequestBuilder del = esNode.getClient().prepareDelete(indexName, jsonBean.getIndexType(), "" + jsonBean.getId());
  342. bulkOperation.add(del);
  343. Boolean createNewBulkOperation = processBulkIfNeeded(bulkOperation, bulkOperations++);
  344. return createNewBulkOperation;
  345. }
  346.  
  347. public Boolean bulkSave(JsonBean jsonBean, BulkRequestBuilder bulkOperation) {
  348.  
  349. IndexRequest indexOperation = Requests.indexRequest(indexName)
  350. .type(jsonBean.getIndexType())
  351. .id("" + jsonBean.getId())
  352. .create(false)
  353. .source(jsonBean.getJson());
  354.  
  355.  
  356. bulkOperation.add(indexOperation);
  357. Boolean createNewBulkOperation = processBulkIfNeeded(bulkOperation, bulkOperations++);
  358. return createNewBulkOperation;
  359.  
  360. }
  361.  
  362. private Boolean processBulkIfNeeded(BulkRequestBuilder bulkOperation, int bulkOperations) {
  363.  
  364. Boolean createNewBulkReq = false;
  365. if (bulkOperation.numberOfActions() >= bulkSize) {
  366. // execute the bulk operation
  367. int currentOnGoingBulks = onGoingBulks.incrementAndGet();
  368. LOG.info("Ongoing Bulks = " + currentOnGoingBulks + ", Index Operations = " + bulkOperations );
  369. if (currentOnGoingBulks > dropThreshold) {
  370. // TODO, just wait here!, we can slow down the parsing
  371. onGoingBulks.decrementAndGet();
  372. String message = "dropping bulk, " + onGoingBulks + " crossed threshold " + dropThreshold + ", Index Operations = " + bulkOperations;
  373. LOG.error(message);
  374. registerError(new Exception(message));
  375. } else {
  376. try {
  377. final int bulkNo = onGoingBulks.get();
  378. LOG.info("Executing Bulk Request " + bulkNo + " for index " + getIndexName());
  379. bulkOperation.execute(new ActionListener<BulkResponse>() {
  380. @Override public void onResponse(BulkResponse bulkResponse) {
  381.  
  382. onGoingBulks.decrementAndGet();
  383. LOG.info("Bulk [" + bulkNo + "] of " + getIndexName() + " Index Executed");
  384. if (bulkResponse.hasFailures()){
  385. RuntimeException indexingException = new RuntimeException(bulkResponse.buildFailureMessage());
  386. registerError(indexingException);
  387. }
  388. }
  389.  
  390. @Override public void onFailure(Throwable indexingException) {
  391. Exception exception = new Exception(indexingException);
  392. registerError(exception);
  393. }
  394. });
  395. } catch (Exception indexingException) {
  396. registerError(indexingException);
  397. }
  398. }
  399. // once we have executed a bulk request, create a new one for adding
  400. // a fresh set of bulk updates, deletes, additions etc.
  401. createNewBulkReq = true;
  402.  
  403. }
  404. return createNewBulkReq;
  405. }
  406.  
  407. private void registerError(Exception indexingException) {
  408. BuildIndex.statusFlag = BuildIndex.ERROR;
  409. BuildIndex.exception = indexingException;
  410. LOG.error("Indexing Exception", indexingException);
  411. }
  412.  
  413.  
  414. public String getIndexName() {
  415. return indexName;
  416. }
  417.  
  418.  
  419. public void setIndexName(String indexName) {
  420. this.indexName = indexName;
  421. }
  422.  
  423.  
  424. public int getBulkSize() {
  425. return bulkSize;
  426. }
  427.  
  428.  
  429. public void setBulkSize(int bulkSize) {
  430. this.bulkSize = bulkSize;
  431. }
  432.  
  433.  
  434. public int getDropThreshold() {
  435. return dropThreshold;
  436. }
  437.  
  438.  
  439. public void setDropThreshold(int dropThreshold) {
  440. this.dropThreshold = dropThreshold;
  441. }
  442.  
  443.  
  444. public AtomicInteger getOnGoingBulks() {
  445. return onGoingBulks;
  446. }
  447.  
  448. public ESNodeWrapper getEsNode() {
  449. return esNode;
  450. }
  451.  
  452. }
  453. ====================================================================================================================
  454. package ****.index.bean;
  455.  
  456. public class JsonBean {
  457.  
  458. private String json;
  459. private int id;
  460. private String indexName;
  461. private String indexType;
  462.  
  463.  
  464. public JsonBean(){}
  465.  
  466. public JsonBean(String json, int id, String indexName, String indexType) {
  467. this.json = json;
  468. this.id = id;
  469. this.indexName = indexName;
  470. this.indexType = indexType;
  471. }
  472.  
  473. public String getJson() {
  474. return json;
  475. }
  476. public void setJson(String json) {
  477. this.json = json;
  478. }
  479. public int getId() {
  480. return id;
  481. }
  482. public void setId(int id) {
  483. this.id = id;
  484. }
  485. public String getIndexName() {
  486. return indexName;
  487. }
  488. public void setIndexName(String indexName) {
  489. this.indexName = indexName;
  490. }
  491. public String getIndexType() {
  492. return indexType;
  493. }
  494. public void setIndexType(String indexType) {
  495. this.indexType = indexType;
  496. }
  497.  
  498.  
  499.  
  500. }
  501.  
  502. ===============================================================================================================================
  503. package ****.elasticsearch.buildIndex;
  504.  
  505. import static ****.elasticsearch.util.ESConstants.ALIAS_CONTACT;
  506. import static ****.elasticsearch.util.ESConstants.INDEX_CONTACT;
  507.  
  508. import java.io.IOException;
  509. import java.util.ArrayList;
  510. import java.util.Collection;
  511. import java.util.List;
  512.  
  513. import org.apache.commons.logging.Log;
  514. import org.apache.commons.logging.LogFactory;
  515. import org.codehaus.jackson.JsonGenerationException;
  516. import org.codehaus.jackson.map.JsonMappingException;
  517. import org.codehaus.jackson.map.ObjectMapper;
  518. import org.springframework.beans.factory.annotation.Autowired;
  519.  
  520. import ****.elasticsearch.index.ESContactIndexer;
  521. import ****.elasticsearch.index.IndexCleanup;
  522. import ****.elasticsearch.util.ESAdmin;
  523. import ****.index.bean.Contact;
  524. import ****.index.bean.ContactJSONWrapper;
  525. import ****.index.bean.JsonBean;
  526. import ****.index.compass.QueuedIndexingThread;
  527. import ****.index.jdbc.BaseContactJDBCIndexer;
  528. import ****.index.jdbc.BuildIndex;
  529. import ****.index.jdbc.IntegerRowMapper;
  530. import ****.index.jdbc.JDBCIndexer;
  531. import ****.index.jdbc.JDBCQueries;
  532. import ****.index.jdbc.JDBCQueryHelper;
  533. import ****.util.CSUtils;
  534.  
  535. public class ESContactJDBCIndexer extends BaseContactJDBCIndexer implements JDBCIndexer, ElasticSearchIndexer{ //JdbcDaoSupport{
  536.  
  537. final static Log LOG = LogFactory.getLog(ESContactJDBCIndexer.class);
  538. QueuedIndexingThread<JsonBean> indexThread = new QueuedIndexingThread<JsonBean>("ESContact Indexer");
  539.  
  540. @Autowired(required=true) ESContactIndexer esContactIndexing;
  541.  
  542. public void init() {
  543.  
  544. }
  545.  
  546.  
  547. public void doIndex() throws Exception {
  548. IndexCleanup.cleanup(esContactIndexing.getEsNode(), "Contact JDBC");
  549. Thread.sleep(1000);
  550. String oldIndexName = getOldIndexName();
  551. LOG.info("Old Index Name[" + oldIndexName + "]");
  552. String newIndexName = createNewIndex();
  553. LOG.info("New Index Name[" + newIndexName + "]");
  554. esContactIndexing.setIndexName(newIndexName);
  555. // This is the big data one
  556. getDBDataAndCreateNewIndex();
  557. LOG.info("Removing alias [" + ALIAS_CONTACT + "] for [" + oldIndexName + "]");
  558. removeAliasOldIndex(oldIndexName, ALIAS_CONTACT);
  559. LOG.info("Adding alias [" + ALIAS_CONTACT + "] for [" + newIndexName + "]");
  560. aliasNewIndex(newIndexName, ALIAS_CONTACT);
  561. //?????? runWarmUpQueries ????? // Lucene 3.0.3 (I think) needs one-off warm up queries.
  562. Thread.sleep(1000);
  563. LOG.info("Deleting index [" + oldIndexName + "]");
  564. deleteOldIndex(oldIndexName);
  565. }
  566.  
  567. @Override
  568. public void removeAliasOldIndex(String oldIndexName, String aliasContact) {
  569. ESAdmin.removeAlias(esContactIndexing.getEsNode(), oldIndexName, aliasContact);
  570. }
  571.  
  572. @Override
  573. public void aliasNewIndex(String newIndexName, String aliasContact) {
  574. ESAdmin.addAlias(esContactIndexing.getEsNode(), newIndexName, aliasContact);
  575. }
  576.  
  577. @Override
  578. public void deleteOldIndex(String oldIndexName) {
  579. ESAdmin.deleteIndex(esContactIndexing.getEsNode(), oldIndexName);
  580. }
  581.  
  582. @Override
  583. public String createNewIndex() {
  584. return EsJdbcHelper.createNewIndexOfType(INDEX_CONTACT, esContactIndexing.getEsNode());
  585. }
  586.  
  587. @Override
  588. public String getOldIndexName() {
  589. return EsJdbcHelper.getOldIndexNameOfType(INDEX_CONTACT, esContactIndexing.getEsNode());
  590. }
  591.  
  592. @Override
  593. public void getDBDataAndCreateNewIndex() throws Exception, InterruptedException {
  594. List<Integer> allPersonPartyIds = getPersonIds();
  595. int size = allPersonPartyIds.size();
  596. if(size==0) return;
  597.  
  598. indexThread.setIndexingTools(esContactIndexing);
  599. indexThread.setStopWhenEmpty(false);
  600. indexThread.setUpdateMode(false);
  601. LOG.info("*** Number of contacts to Index = " + size);
  602. // set chunk size to give about 5000 documents per chunk
  603. int numberOfChunks = size / 5000;
  604. LOG.info("numberOfChunks = " + numberOfChunks);
  605. List<JDBCQueryHelper.HiLowId> idBetweenList = JDBCQueryHelper.createBetweenIdList(allPersonPartyIds, numberOfChunks);
  606.  
  607. int count = 0;
  608. for (JDBCQueryHelper.HiLowId hiLo: idBetweenList){
  609. if (BuildIndex.statusFlag == BuildIndex.ERROR){
  610. LOG.error("Error In Indexbuilding, stopping now.", BuildIndex.exception);
  611. return;
  612. }
  613.  
  614. Collection<Contact> parties = mapContacts(hiLo);
  615. indexCollection(parties);
  616. ESAdmin.getStats(esContactIndexing.getEsNode(), LOG);
  617. LOG.info("Pass number " + ++count + " executed.");
  618. }
  619.  
  620. Thread.sleep(2000);
  621. indexThread.setStopWhenEmpty(true);
  622.  
  623. try {
  624. Thread.sleep(5000);
  625. while (indexThread.isStillRunning()){
  626. Thread.sleep(5000);
  627. }
  628. } catch (InterruptedException e) {}
  629. }
  630.  
  631.  
  632. public List<Integer> getPersonIds() {
  633.  
  634. List <Integer> ids = getJdbcTemplate().query(
  635. JDBCQueries.GET_ALL_PERSON_IDS_PROC, new IntegerRowMapper());
  636.  
  637. return ids;
  638. }
  639.  
  640. public void setExtPartyIndexing(ESContactIndexer contactIndexing) {
  641. this.esContactIndexing = contactIndexing;
  642. }
  643.  
  644. public void doIndexLimited(String contactIds) throws Exception {
  645. esContactIndexing.setIndexName(ALIAS_CONTACT);
  646. LOG.info("Contact Ids[" + contactIds + "]");
  647. if(contactIds==null || "".equals(contactIds)) {
  648. LOG.warn("Empty contactIds!! Returning now.");
  649. return;
  650. }
  651.  
  652. Collection<Contact> parties = mapContactsLimited(contactIds);
  653. List<JsonBean> jsonList = getCollectionAsJsonBeanList(parties);
  654. esContactIndexing.saveOrUpdateBatch(jsonList);
  655. // refresh the index before returning, we do it TWICE deliberately
  656. esContactIndexing.refresh();
  657. esContactIndexing.refresh();
  658. }
  659.  
  660. void indexCollection(Collection<Contact> parties) throws Exception {
  661. List<JsonBean> jsonList = getCollectionAsJsonBeanList(parties);
  662. LOG.info("created " + jsonList.size() + " contact json beans.");
  663. indexJSONCollection(jsonList);
  664.  
  665. }
  666.  
  667. private List<JsonBean> getCollectionAsJsonBeanList(
  668. Collection<Contact> parties) throws IOException,
  669. JsonGenerationException, JsonMappingException {
  670. List<JsonBean> jsonList = new ArrayList<JsonBean>(parties.size());
  671. int i = 0;
  672. for (Contact contact : parties){
  673. ContactJSONWrapper wrapperForLessVerboseJson = new ContactJSONWrapper(contact);
  674. String json1 = CSUtils.mapper().writeValueAsString(wrapperForLessVerboseJson);
  675. if (json1 != null){
  676. if (i == 0){
  677. LOG.info("contact json = " + json1);
  678. i++;
  679. }
  680. JsonBean jsonBean = new JsonBean(json1, contact.getPartyId(), esContactIndexing.getIndexName() , contact.getType());
  681. jsonList.add(jsonBean);
  682. }
  683. }
  684. return jsonList;
  685. }
  686.  
  687. private void indexJSONCollection(Collection<JsonBean> parties) {
  688. indexThread.addToIndexQueAndIncrementBatchNumber(parties);
  689. if (!indexThread.isStarted()){
  690. indexThread.start();
  691. }
  692. }
  693.  
  694. }
  695.  
  696. ==============================================================================================================================
  697.  
  698. package ****.index.compass;
  699.  
  700. import java.util.Collection;
  701. import java.util.Collections;
  702. import java.util.LinkedList;
  703. import java.util.List;
  704. import java.util.concurrent.atomic.AtomicInteger;
  705.  
  706. import org.apache.commons.logging.Log;
  707. import org.apache.commons.logging.LogFactory;
  708.  
  709. public class QueuedIndexingThread<T> extends Thread {
  710.  
  711. private String description;
  712.  
  713. /**
  714. * Use this if you know how many batches you need to index at instatiation
  715. * time. - use "addToIndexQueAndIncrementBatchNumber" when adding 'chunks'
  716. * to index. - do not use "addToIndexQue"
  717. *
  718. * - also this way seems to be slower than using predefined chunk size,
  719. * pehaps because we are changing variable values in the middle of a thread
  720. * causing more context switching.
  721. *
  722. * @param numberOfBatchesToIndex
  723. */
  724. public QueuedIndexingThread(String description) {
  725. this.numberOfBatchesToIndex = 0;
  726. this.isPreConfiguredChunkSize = false;
  727. this.description = description;
  728. }
  729.  
  730. /**
  731. * Use this if you know how many batches you need to index at instatiation
  732. * time. - use "addToIndexQue" when adding 'chunks' to index. - do not use
  733. * not "addToIndexQueAndIncrementBatchNumber"
  734. *
  735. * @param numberOfBatchesToIndex
  736. */
  737. public QueuedIndexingThread(int numberOfBatchesToIndex, String description) {
  738. this.numberOfBatchesToIndex = numberOfBatchesToIndex;
  739. this.isPreConfiguredChunkSize = true;
  740. this.description = description;
  741. }
  742.  
  743. private final static Log LOG = LogFactory.getLog(QueuedIndexingThread.class);
  744.  
  745. private IIndexing<T> indexing;
  746. int numberOfBatchesToIndex = 0;
  747.  
  748. private boolean isStarted = false;
  749. private boolean doNotStop = true;
  750. private boolean stopWhenEmpty = true;
  751.  
  752. private Boolean isPreConfiguredChunkSize;
  753. private Boolean updateMode;
  754. private int countIndexesFinished;
  755.  
  756. public Boolean getUpdateMode() {
  757. return updateMode;
  758. }
  759.  
  760. public void setUpdateMode(Boolean updateMode) {
  761. this.updateMode = updateMode;
  762. }
  763.  
  764. private LinkedList<Collection<T>> que = new LinkedList<Collection<T>>();
  765.  
  766. @Override
  767. public void run() {
  768.  
  769. isStarted = true;
  770.  
  771. LOG.info("Starting Indexing" + description);
  772.  
  773. while (doNotStop) {
  774.  
  775. try {
  776.  
  777. Collection<T> indexAble = que.poll();
  778.  
  779. if ( indexAble != null) {
  780.  
  781. LOG.info( String.format( "Starting Indexing [ %s ] Batch number [ %s ]", description, ++countIndexesFinished ) );
  782.  
  783. if (updateMode) {
  784.  
  785. indexing.saveOrUpdateBatch(indexAble);
  786.  
  787. } else {
  788.  
  789. // do not do update when indexing re-index by indexing an new index and deleting the
  790. // old one then alias the new index to a constant alias name.
  791. indexing.saveBatch(indexAble);
  792. }
  793.  
  794. indexAble.clear();
  795.  
  796. indexAble = null;
  797.  
  798. LOG.info( String.format( "Finished Indexing [ %s ] Batch number [ %s ]", description, countIndexesFinished ) );
  799.  
  800. }
  801.  
  802. Thread.sleep(1000);
  803.  
  804. doNotStop = (countIndexesFinished < numberOfBatchesToIndex) || !stopWhenEmpty || que.size() > 0;
  805.  
  806. } catch (InterruptedException ex) {
  807.  
  808. throw new RuntimeException(ex);
  809.  
  810. }
  811.  
  812. }
  813.  
  814. LOG.info("Shutting down Indexer" + this);
  815.  
  816. try {
  817.  
  818. Thread.sleep(25000);
  819.  
  820. } catch (InterruptedException e) {
  821.  
  822. e.printStackTrace();
  823. }
  824. }
  825.  
  826. public void addToIndexQue(Collection<T> indexableChunk) {
  827.  
  828. if (!isPreConfiguredChunkSize) {
  829. throw new IllegalStateException("If you have not pre-defined your chunk size use 'addToIndexQueAndIncrementBatchNumber' to add chunks");
  830. }
  831.  
  832. que.add(indexableChunk);
  833. }
  834.  
  835. /**
  836. *
  837. * Note - this way seems to be slower than using predefined chunk size,
  838. * pehaps because we are changing variable values in the middle of a thread
  839. * causing more context switching.
  840. *
  841. * @param indexableChunk
  842. */
  843. public void addToIndexQueAndIncrementBatchNumber(Collection<T> indexableChunk) {
  844.  
  845. if (isPreConfiguredChunkSize) {
  846. throw new IllegalStateException( "If you have pre-defined your chunk size use 'addToIndexQue' to add chunks");
  847. }
  848.  
  849. this.numberOfBatchesToIndex++;
  850. que.add(indexableChunk);
  851.  
  852. LOG.info(String.format(" ADDED BATCH [ %s ] ", this.numberOfBatchesToIndex) );
  853.  
  854. }
  855.  
  856. public boolean isStarted() {
  857. return isStarted;
  858. }
  859.  
  860. public void setIndexingTools(IIndexing<T> indexing) {
  861. this.indexing = indexing;
  862. }
  863.  
  864. public boolean isStillRunning() {
  865. return doNotStop;
  866. }
  867.  
  868. public String stopStatus() {
  869.  
  870. return
  871. String.format( "doNotStop [ %s ] countIndexesFinished < numberOfBatchesToIndex [ %s ], !stopWhenEmpty [ %s ], que.size [ %s ]",
  872. doNotStop, (countIndexesFinished < numberOfBatchesToIndex), !stopWhenEmpty, que.size() );
  873. }
  874.  
  875. /**
  876. * Set this to false to keep the Thread going because we expect more data to
  877. * be added later. - once all data has been added you must manually set this
  878. * back to true or the thread will continue for ever.
  879. *
  880. * @param stopWhenEmpty
  881. */
  882. public void setStopWhenEmpty(boolean stopWhenEmpty) {
  883. this.stopWhenEmpty = stopWhenEmpty;
  884. }
  885.  
  886. }
  887.  
  888.  
  889.  
  890.  
  891.  
  892.  
  893. ======================================================================================================
Add Comment
Please, Sign In to add comment