Advertisement
Guest User

Untitled

a guest
May 6th, 2015
253
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.60 KB | None | 0 0
  1. /* @flow */
  2.  
  3. var { Observable } = require('rx');
  4. var client = require('mongodb');
  5. var { assign } = require('lodash');
  6. var __DEV__ = process.env.NODE_ENV !== 'production';
  7. var URL = 'my database url';
  8.  
  9. class QueryBuilder {
  10. _db$: Observable;
  11. _selectors: Object;
  12.  
  13. constructor(db$: Observable, selectors?: Object) {
  14. this._db$ = db$;
  15.  
  16. if (!selectors) {
  17. this._selectors = assign({
  18. collection: null,
  19. query: {},
  20. opts: {},
  21. sort: {},
  22. offset: 0,
  23. limit: 0
  24. }, selectors);
  25. }
  26. else {
  27. this._selectors = selectors;
  28. }
  29. }
  30.  
  31. static connect(url: string): QueryBuilder {
  32. var connect = Observable.fromNodeCallback(client.connect, client);
  33. var db$ = connect(db.url);
  34.  
  35. db$.subscribe(
  36. _ => {
  37. if (__DEV__) {
  38. console.log('Connected to database on', url);
  39. }
  40. },
  41. err => console.log('Database connection error:', err.message, err.stack)
  42. );
  43.  
  44. return new QueryBuilder(db$);
  45. }
  46.  
  47. close(): any {
  48. this._db$.dispose();
  49.  
  50. return QueryBuilder;
  51. }
  52.  
  53. collection(name: string): QueryBuilder {
  54. var ss = assign({}, this._selectors, { collection: name });
  55. return new QueryBuilder(this._db$, ss);
  56. }
  57.  
  58. select(query?: Object = {}, opts?: Object = {}): QueryBuilder {
  59. var ss = assign({}, this._selectors, { query, opts });
  60. return new QueryBuilder(this._db$, ss);
  61. }
  62.  
  63. selectOne(query?: Object = {}, opts?: Object = {}): QueryBuilder {
  64. var ss = assign({}, this._selectors, {
  65. query: query,
  66. opts: opts,
  67. limit: 1
  68. });
  69. return new QueryBuilder(this._db$, ss);
  70. }
  71.  
  72. sort(sort: Object): QueryBuilder {
  73. var ss = assign({}, this._selectors, { sort });
  74. return new QueryBuilder(this._db$, ss);
  75. }
  76.  
  77. skip(offset: number): QueryBuilder {
  78. var ss = assign({}, this._selectors, { offset });
  79. return new QueryBuilder(this._db$, ss);
  80. }
  81.  
  82. limit(limit: number = 0): QueryBuilder {
  83. var ss = assign({}, this._selectors, { limit });
  84. return new QueryBuilder(this._db$, ss);
  85. }
  86.  
  87. exec(): Observable {
  88. var ss = this._selectors;
  89. var db$ = this._db$
  90.  
  91. if (!ss.collection) return Observable.throw('You have to provide collection name.');
  92. if (!db$) return Observable.throw('No db connection found.');
  93.  
  94. var o = db$.flatMapLatest(db => {
  95. var c = db.collection(ss.collection);
  96. var cursor = c.find(ss.query, ss.opts).sort(ss.sort).skip(ss.offset).limit(ss.limit);
  97. var obs = Observable.fromNodeCallback(cursor.toArray, cursor);
  98. return obs();
  99. });
  100.  
  101. return ss.limit === 1 ? o.map(res => res[0]) : o;
  102. }
  103. }
  104.  
  105. module.exports = QueryBuilder.connect(URL);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement