Advertisement
Guest User

Untitled

a guest
Oct 15th, 2019
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.93 KB | None | 0 0
  1. const {forEach, fromIter, map, filter, pipe, fromEvent, merge, interval} = require('callbag-basics')
  2. const Queue = require('better-queue')
  3. const low = require('lowdb')
  4. const FileSync = require('lowdb/adapters/FileSync')
  5.  
  6. class DBQ {
  7. constructor(process, {option}, dbPath) {
  8. const adapter = new FileSync(dbPath)
  9. const db = low(adapter)
  10. this.q = new Queue(process,{option});
  11. this.db = db
  12. this.db.defaults({ q: [] }).write()
  13.  
  14. //take this out so i can configure
  15. this.fromDB2Q()
  16. setInterval(() => this.fromDB2Q(),5 * 60 * 1000)
  17. }
  18.  
  19. fromDB2Q = () => {
  20. let items = this.db.get('q').value()
  21. items.forEach(item => {
  22. console.log(item)
  23. this.q.push({
  24. rf_id: item.rf_id,
  25. file_name: item.file_name,
  26. file_path: item.file_path,
  27. festival: item.festival,
  28. experience: item.experience
  29. })
  30. console.log(`retrieved csv ${item}, added to q`)
  31. })
  32. }
  33. startQ = () => (start, sink) =>{
  34. if(start !== 0) return
  35. // sink(1,this.q)
  36. sink(0, t => {
  37. if (t === 2){
  38. console.log('disposed')
  39. sink(2,this.q)
  40. }else{
  41. // sink(t,this.q)
  42. }
  43. })
  44. sink(1,this.q)
  45. }
  46. add = (input, cb) => this.q.push(input, cb)
  47. addEvent = (name,fac) => source => (start, sink) =>{
  48. if(start !== 0) return
  49. source(0,(t,d)=>{
  50. if(t===1){
  51. let q = d
  52. q.on(name,fac)
  53. sink(1,q)
  54. }else {
  55. sink(t,d)
  56. }
  57. })
  58. }
  59.  
  60. onTaskDone = function (taskId, input, stats){
  61. let self = this
  62. // if (DELETE_AFTER_UPLOAD) fs.unlink(input.file_path, (err) => {})
  63. let item_in_q = self.db.get('q').find({file_name: input.file_name}).value()
  64. if (item_in_q !== undefined) {
  65. self.db.get('q').remove({file_name: input.file_name}).write()
  66. }
  67. console.log("task completed!")
  68. }
  69. onError = function (taskId, input, stats){
  70. let self = this
  71. let check = self.db.get('q').filter({file_name: input.file_name}).value()
  72. if (check.length === 0) {
  73. self.db.get('q')
  74. .push({
  75. rf_id: input.rf_id,
  76. file_name: input.file_name,
  77. file_path: input.file_path,
  78. festival: input.festival,
  79. experience: input.experience
  80. })
  81. .write()
  82. }
  83. console.log(`error, adding back to q -> ${input}`)
  84. }
  85.  
  86. run = function () {
  87. let self = this
  88. let stream = pipe(
  89. self.startQ(),
  90. self.addEvent('task_finish',self.onTaskDone.bind(self)),
  91. self.addEvent('task_failed',self.onError.bind(self)),
  92. )
  93. forEach(d =>{})(stream)
  94. }
  95. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement