Guest User

Untitled

a guest
May 24th, 2018
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.00 KB | None | 0 0
  1. const rxjs = require('rxjs'),
  2. from = require('from2'),
  3. through = require('through2');
  4.  
  5. const rxStreamDuplex = stream => {
  6. if (stream.pause)
  7. stream.pause();
  8.  
  9. return source => rxjs.Observable.create(observer => {
  10. function onDataHandler(data) {
  11. observer.next(data)
  12. };
  13. function onEndHandler() {
  14. observer.complete()
  15. };
  16. function onErrorHandler(err) {
  17. observer.error(err)
  18. };
  19.  
  20. stream.addListener('data', onDataHandler);
  21. stream.addListener('end', onEndHandler);
  22. stream.addListener('error', onErrorHandler);
  23.  
  24. if (source) {
  25. source.subscribe({
  26. next: chunk => stream.write(chunk),
  27. error: err => stream.emit('error', err),
  28. complete: () => stream.end()
  29. });
  30. }
  31.  
  32. if(stream.resume)
  33. stream.resume();
  34.  
  35. return () => {
  36. stream.removeListener('data', onDataHandler);
  37. stream.removeListener('end', onEndHandler);
  38. stream.removeListener('error', onErrorHandler);
  39.  
  40. if (stream.destroy)
  41. stream.destroy();
  42. }
  43. });
  44. }
  45.  
  46. const rxStream = stream => rxStreamDuplex(stream)();
  47.  
  48. rxjs.Observable.prototype.pipe = (oldpipe => function(...args) {
  49. return oldpipe.apply(this, args.map(o => o.write && o.pipe ? rxStreamDuplex(o) : o));
  50. })(rxjs.Observable.prototype.pipe);
  51.  
  52. const randomValues = (count = 10) => {
  53. return from((size, next) => {
  54. process.nextTick(()=>{
  55. next(
  56. null, count--
  57. ? Buffer.from((~~ (Math.random() * 0xFFFFFFF)).toString(16) + '', 'utf8')
  58. : null);
  59. });
  60. })
  61. }
  62.  
  63. const prependTest = (text) => {
  64. return through((chunk, enc, next) => {
  65. next(null, Buffer.concat([
  66. Buffer.from(text, 'utf8'),
  67. chunk
  68. ]));
  69. })
  70. }
  71.  
  72. randomValues()
  73. .pipe(prependTest('Random value with Node Stream pipeline '))
  74. .on('data', d => console.log(d.toString()))
  75.  
  76. rxStream(randomValues())
  77. .pipe(prependTest('Random value with RxJS pipeline '))
  78. .subscribe(d => console.log(d.toString()));
  79.  
  80. /*****
  81.  
  82. Random value with Node Stream pipeline b72b110
  83. Random value with RxJS pipeline 411117d
  84. Random value with Node Stream pipeline e31eac2
  85. Random value with RxJS pipeline d206325
  86. Random value with Node Stream pipeline eb6465f
  87. Random value with RxJS pipeline c5f06cd
  88. Random value with Node Stream pipeline ef4f779
  89. Random value with RxJS pipeline 93ff4ad
  90. Random value with Node Stream pipeline a859adf
  91. Random value with RxJS pipeline 22f3062
  92. Random value with Node Stream pipeline 9ad1124
  93. Random value with RxJS pipeline b1a2c56
  94. Random value with Node Stream pipeline 25c3961
  95. Random value with RxJS pipeline 72f230d
  96. Random value with Node Stream pipeline ae2694b
  97. Random value with RxJS pipeline aa63fb9
  98. Random value with Node Stream pipeline a8dc1f3
  99. Random value with RxJS pipeline 1ee4b04
  100. Random value with Node Stream pipeline a66c721
  101. Random value with RxJS pipeline 7675fdd
  102.  
  103. *****/
Add Comment
Please, Sign In to add comment