Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /* @flow */
- var { Observable } = require('rx');
- var client = require('mongodb');
- var { assign } = require('lodash');
- var __DEV__ = process.env.NODE_ENV !== 'production';
- var URL = 'my database url';
- class QueryBuilder {
- _db$: Observable;
- _selectors: Object;
- constructor(db$: Observable, selectors?: Object) {
- this._db$ = db$;
- if (!selectors) {
- this._selectors = assign({
- collection: null,
- query: {},
- opts: {},
- sort: {},
- offset: 0,
- limit: 0
- }, selectors);
- }
- else {
- this._selectors = selectors;
- }
- }
- static connect(url: string): QueryBuilder {
- var connect = Observable.fromNodeCallback(client.connect, client);
- var db$ = connect(db.url);
- db$.subscribe(
- _ => {
- if (__DEV__) {
- console.log('Connected to database on', url);
- }
- },
- err => console.log('Database connection error:', err.message, err.stack)
- );
- return new QueryBuilder(db$);
- }
- close(): any {
- this._db$.dispose();
- return QueryBuilder;
- }
- collection(name: string): QueryBuilder {
- var ss = assign({}, this._selectors, { collection: name });
- return new QueryBuilder(this._db$, ss);
- }
- select(query?: Object = {}, opts?: Object = {}): QueryBuilder {
- var ss = assign({}, this._selectors, { query, opts });
- return new QueryBuilder(this._db$, ss);
- }
- selectOne(query?: Object = {}, opts?: Object = {}): QueryBuilder {
- var ss = assign({}, this._selectors, {
- query: query,
- opts: opts,
- limit: 1
- });
- return new QueryBuilder(this._db$, ss);
- }
- sort(sort: Object): QueryBuilder {
- var ss = assign({}, this._selectors, { sort });
- return new QueryBuilder(this._db$, ss);
- }
- skip(offset: number): QueryBuilder {
- var ss = assign({}, this._selectors, { offset });
- return new QueryBuilder(this._db$, ss);
- }
- limit(limit: number = 0): QueryBuilder {
- var ss = assign({}, this._selectors, { limit });
- return new QueryBuilder(this._db$, ss);
- }
- exec(): Observable {
- var ss = this._selectors;
- var db$ = this._db$
- if (!ss.collection) return Observable.throw('You have to provide collection name.');
- if (!db$) return Observable.throw('No db connection found.');
- var o = db$.flatMapLatest(db => {
- var c = db.collection(ss.collection);
- var cursor = c.find(ss.query, ss.opts).sort(ss.sort).skip(ss.offset).limit(ss.limit);
- var obs = Observable.fromNodeCallback(cursor.toArray, cursor);
- return obs();
- });
- return ss.limit === 1 ? o.map(res => res[0]) : o;
- }
- }
- module.exports = QueryBuilder.connect(URL);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement