Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Diagnostics;
- using System.Reactive.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- namespace DrvTest {
- class Program {
- static void Main(string[] args) {
- var driver1 = CreateDriver("driver1");
- var driver2 = CreateDriver("driver2");
- var subscription = driver1
- .Merge(driver2)
- //.ObserveOn(new EventLoopScheduler())
- .Select(Transform)
- .Subscribe();
- Console.ReadKey();
- subscription.Dispose();
- }
- private static IObservable<string> CreateDriver(string id) {
- return Observable.Create<string>((obs,cancel) => {
- return Task.Run(() => {
- while (!cancel.IsCancellationRequested) {
- var stopWatch = Stopwatch.StartNew();
- obs.OnNext(id);
- stopWatch.Stop();
- Console.WriteLine($"CALL OnNext({id}) blocked for {stopWatch.ElapsedMilliseconds} ms");
- }
- }, cancel);
- });
- }
- private static string Transform(string id) {
- Console.WriteLine($"Processing event for {id}");
- Thread.Sleep(1000); // simulate slow transform..
- return id;
- }
- }
- }
Add Comment
Please, Sign In to add comment