Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.vertigo.crawler.persistence.queue;
- import java.util.List;
- import java.util.concurrent.TimeUnit;
- import com.typesafe.config.Config;
- import io.reactivex.BackpressureStrategy;
- import io.reactivex.Flowable;
- import io.reactivex.Observable;
- import lombok.SneakyThrows;
- import lombok.val;
- import org.sql2o.Sql2o;
- import org.sql2o.quirks.QuirksDetector;
- import com.vertigo.core.config.ds.DbConfig;
- import com.vertigo.core.config.ds.DbConfigFactory;
- import com.vertigo.core.config.typesafe.TypesafeConfigLocator;
- import com.vertigo.model.enums.Provider;
- import com.vertigo.model.json.sync.CrawlerTask;
- import com.vertigo.service.module.MigratingDataSourceProvider;
- public class FlowableRun {
- private QueueOverDb qdb;
- @SneakyThrows
- private void run() {
- val config = TypesafeConfigLocator.getConfig("vertigo-crawler-ws.conf");
- val sql2o = createSql2o(config);
- this.qdb = new QueueOverDb.Builder(sql2o::beginTransaction)
- .internalTx(true)
- .build();
- val timer = Observable.interval(1, TimeUnit.SECONDS);
- timer.subscribe(i ->
- System.out.println("i = " + i)
- );
- Thread.sleep(60000); // Just to keep the program running
- //while (true) {
- // Thread.sleep(10);
- //}
- //val flowable = flowablePeek(qdb);
- //flowable.subscribe(ls ->
- // System.out.println("ls = " + ls)
- //);
- }
- private Sql2o createSql2o(Config config) {
- try {
- // create the database
- final DbConfigFactory factory = new DbConfigFactory();
- final DbConfig dbConfig = factory.create(config);
- TypedDbUtil.ensureDatabaseExists(dbConfig);
- final MigratingDataSourceProvider dsp =
- new MigratingDataSourceProvider(config);
- return new Sql2o(dsp.get(), QuirksDetector.forURL(dbConfig.protocol));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- public Flowable<List<CrawlerTask>> flowablePeek(Queue<CrawlerTask> queue) {
- Flowable<List<CrawlerTask>> listFlowable = Flowable.create(e -> {
- if (e.isCancelled()) {
- return;
- }
- try {
- e.onNext(queue.peek(Provider.Spotify));
- } catch (Exception ex) {
- e.onError(ex);
- }
- }, BackpressureStrategy.BUFFER);
- return listFlowable;
- }
- public static void main(String[] args) {
- new FlowableRun().run();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement