Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- const rxjs = require('rxjs'),
- from = require('from2'),
- through = require('through2');
- const rxStreamDuplex = stream => {
- if (stream.pause)
- stream.pause();
- return source => rxjs.Observable.create(observer => {
- function onDataHandler(data) {
- observer.next(data)
- };
- function onEndHandler() {
- observer.complete()
- };
- function onErrorHandler(err) {
- observer.error(err)
- };
- stream.addListener('data', onDataHandler);
- stream.addListener('end', onEndHandler);
- stream.addListener('error', onErrorHandler);
- if (source) {
- source.subscribe({
- next: chunk => stream.write(chunk),
- error: err => stream.emit('error', err),
- complete: () => stream.end()
- });
- }
- if(stream.resume)
- stream.resume();
- return () => {
- stream.removeListener('data', onDataHandler);
- stream.removeListener('end', onEndHandler);
- stream.removeListener('error', onErrorHandler);
- if (stream.destroy)
- stream.destroy();
- }
- });
- }
- const rxStream = stream => rxStreamDuplex(stream)();
- rxjs.Observable.prototype.pipe = (oldpipe => function(...args) {
- return oldpipe.apply(this, args.map(o => o.write && o.pipe ? rxStreamDuplex(o) : o));
- })(rxjs.Observable.prototype.pipe);
- const randomValues = (count = 10) => {
- return from((size, next) => {
- process.nextTick(()=>{
- next(
- null, count--
- ? Buffer.from((~~ (Math.random() * 0xFFFFFFF)).toString(16) + '', 'utf8')
- : null);
- });
- })
- }
- const prependTest = (text) => {
- return through((chunk, enc, next) => {
- next(null, Buffer.concat([
- Buffer.from(text, 'utf8'),
- chunk
- ]));
- })
- }
- randomValues()
- .pipe(prependTest('Random value with Node Stream pipeline '))
- .on('data', d => console.log(d.toString()))
- rxStream(randomValues())
- .pipe(prependTest('Random value with RxJS pipeline '))
- .subscribe(d => console.log(d.toString()));
- /*****
- Random value with Node Stream pipeline b72b110
- Random value with RxJS pipeline 411117d
- Random value with Node Stream pipeline e31eac2
- Random value with RxJS pipeline d206325
- Random value with Node Stream pipeline eb6465f
- Random value with RxJS pipeline c5f06cd
- Random value with Node Stream pipeline ef4f779
- Random value with RxJS pipeline 93ff4ad
- Random value with Node Stream pipeline a859adf
- Random value with RxJS pipeline 22f3062
- Random value with Node Stream pipeline 9ad1124
- Random value with RxJS pipeline b1a2c56
- Random value with Node Stream pipeline 25c3961
- Random value with RxJS pipeline 72f230d
- Random value with Node Stream pipeline ae2694b
- Random value with RxJS pipeline aa63fb9
- Random value with Node Stream pipeline a8dc1f3
- Random value with RxJS pipeline 1ee4b04
- Random value with Node Stream pipeline a66c721
- Random value with RxJS pipeline 7675fdd
- *****/
Add Comment
Please, Sign In to add comment