Guest User

Untitled

a guest
May 27th, 2018
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.33 KB | None | 0 0
  1. using System;
  2. using System.Diagnostics;
  3. using System.Reactive.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6.  
  7. namespace DrvTest {
  8. class Program {
  9. static void Main(string[] args) {
  10. var driver1 = CreateDriver("driver1");
  11. var driver2 = CreateDriver("driver2");
  12.  
  13. var subscription = driver1
  14. .Merge(driver2)
  15. //.ObserveOn(new EventLoopScheduler())
  16. .Select(Transform)
  17. .Subscribe();
  18.  
  19. Console.ReadKey();
  20. subscription.Dispose();
  21. }
  22.  
  23. private static IObservable<string> CreateDriver(string id) {
  24. return Observable.Create<string>((obs,cancel) => {
  25. return Task.Run(() => {
  26. while (!cancel.IsCancellationRequested) {
  27. var stopWatch = Stopwatch.StartNew();
  28. obs.OnNext(id);
  29. stopWatch.Stop();
  30. Console.WriteLine($"CALL OnNext({id}) blocked for {stopWatch.ElapsedMilliseconds} ms");
  31. }
  32. }, cancel);
  33. });
  34. }
  35.  
  36. private static string Transform(string id) {
  37. Console.WriteLine($"Processing event for {id}");
  38. Thread.Sleep(1000); // simulate slow transform..
  39. return id;
  40. }
  41. }
  42. }
Add Comment
Please, Sign In to add comment