Advertisement
NLinker

Flowable interval

Jun 3rd, 2017
485
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.58 KB | None | 0 0
  1. package com.vertigo.crawler.persistence.queue;
  2.  
  3. import java.util.List;
  4. import java.util.concurrent.TimeUnit;
  5. import com.typesafe.config.Config;
  6. import io.reactivex.BackpressureStrategy;
  7. import io.reactivex.Flowable;
  8. import io.reactivex.Observable;
  9. import lombok.SneakyThrows;
  10. import lombok.val;
  11. import org.sql2o.Sql2o;
  12. import org.sql2o.quirks.QuirksDetector;
  13.  
  14. import com.vertigo.core.config.ds.DbConfig;
  15. import com.vertigo.core.config.ds.DbConfigFactory;
  16. import com.vertigo.core.config.typesafe.TypesafeConfigLocator;
  17. import com.vertigo.model.enums.Provider;
  18. import com.vertigo.model.json.sync.CrawlerTask;
  19. import com.vertigo.service.module.MigratingDataSourceProvider;
  20.  
  21. public class FlowableRun {
  22.     private QueueOverDb qdb;
  23.  
  24.     @SneakyThrows
  25.     private void run() {
  26.         val config = TypesafeConfigLocator.getConfig("vertigo-crawler-ws.conf");
  27.         val sql2o = createSql2o(config);
  28.         this.qdb = new QueueOverDb.Builder(sql2o::beginTransaction)
  29.             .internalTx(true)
  30.             .build();
  31.         val timer = Observable.interval(1, TimeUnit.SECONDS);
  32.         timer.subscribe(i ->
  33.                 System.out.println("i = " + i)
  34.             );
  35.  
  36.         Thread.sleep(60000); // Just to keep the program running
  37.  
  38.         //while (true) {
  39.         //    Thread.sleep(10);
  40.         //}
  41.         //val flowable = flowablePeek(qdb);
  42.         //flowable.subscribe(ls ->
  43.         //    System.out.println("ls = " + ls)
  44.         //);
  45.     }
  46.  
  47.     private Sql2o createSql2o(Config config) {
  48.         try {
  49.             // create the database
  50.             final DbConfigFactory factory = new DbConfigFactory();
  51.             final DbConfig dbConfig = factory.create(config);
  52.             TypedDbUtil.ensureDatabaseExists(dbConfig);
  53.             final MigratingDataSourceProvider dsp =
  54.                 new MigratingDataSourceProvider(config);
  55.             return new Sql2o(dsp.get(), QuirksDetector.forURL(dbConfig.protocol));
  56.         } catch (Exception e) {
  57.             throw new RuntimeException(e);
  58.         }
  59.     }
  60.  
  61.     public Flowable<List<CrawlerTask>> flowablePeek(Queue<CrawlerTask> queue) {
  62.         Flowable<List<CrawlerTask>> listFlowable = Flowable.create(e -> {
  63.             if (e.isCancelled()) {
  64.                 return;
  65.             }
  66.             try {
  67.                 e.onNext(queue.peek(Provider.Spotify));
  68.             } catch (Exception ex) {
  69.                 e.onError(ex);
  70.             }
  71.         }, BackpressureStrategy.BUFFER);
  72.         return listFlowable;
  73.     }
  74.  
  75.     public static void main(String[] args) {
  76.         new FlowableRun().run();
  77.     }
  78.  
  79. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement