Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Private Function ObserveUDP() As IObservable(Of bytes())
- Dim f = Function(observer)
- Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
- Dim client = New UdpClient(endpoint)
- Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _
- ( Nothing _
- , Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _
- , Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _
- , Function(task As Task(Of UdpReceiveResult)) task.Result)
- Dim observable = obs.Select(Function(r) r.Buffer)
- dim handle = observable.Subscribe(observer)
- Dim df = Sub()
- client.Close()
- handle.Dispose()
- End Sub
- Return Disposable.Create(df)
- End Function
- Return observable.Create(f)
- End Function
- ''' initializer - a function that initializes and returns the state object
- ''' generator - a function that asynchronously using await generates each value
- ''' finalizer - a function for cleaning up the state object when the sequence is unsubscribed
- Private Function ObservableAsyncSeq(Of T, I)( _
- initializer As Func(Of I), _
- generator As Func(Of I, Task(Of T)), _
- finalizer As Action(Of I)) As IObservable(Of T)
- Dim q = Function(observer As IObserver(Of T))
- Dim go = True
- Try
- Dim r = Async Sub()
- Dim ii As I = initializer()
- While go
- Dim result = Await generator(ii)
- observer.OnNext(result)
- End While
- finalizer(ii)
- observer.OnCompleted()
- End Sub
- Task.Run(r)
- Catch ex As Exception
- observer.OnError(ex)
- End Try
- ' Disposable for stopping the sequence as per
- ' the observable contract
- Return Sub() go = False
- End Function
- Return Observable.Create(q)
- End Function
- Private Function ObserveMeasurementPoints2() As IObservable(Of ProcessedDate)
- Dim initializer = Function()
- Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
- Return New UdpClient(endpoint)
- End Function
- Dim finalizer = Function(client As UdpClient)
- client.Close()
- End Function
- Dim generator = Function(client As UdpClient) As Task(Of UdpReceiveResult)
- Return client.ReceiveAsync()
- End Function
- Return ObservableAsyncSeq(initializer, generator, finalizer).Select(Function(r) ProcessBytes(r.Buffer))
- End Function
- Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
- Return Observable.Using(Of T, UdpClient)(
- Function() New UdpClient(endpoint),
- Function(udpClient) _
- Observable.Defer(Function() udpClient.ReceiveAsync().ToObservable()) _
- .Repeat() _
- .Select(Function(result) processor(result.Buffer))
- )
- End Function
- Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
- Return Observable.Using(
- Function() New UdpClient(endpoint),
- Function(udpClient) Observable.Defer( _
- Observable.FromAsyncPattern(
- AddressOf udpClient.BeginReceive,
- Function(iar)
- Dim remoteEp = TryCast(iar.AsyncState, IPEndPoint)
- Return udpClient.EndReceive(iar, remoteEp)
- End Function)
- ).Repeat() _
- .Select(processor)
- )
- End Function
- Shared Sub Main()
- Using UdpStream(New IPEndPoint(IPAddress.Loopback, 13200),
- Function(bytes) String.Join(",", bytes)
- ).Subscribe(AddressOf Console.WriteLine)
- Console.ReadLine()
- End Using
- Console.WriteLine("Done")
- Console.ReadKey()
- End Sub
- void IDisposable.Dispose()
- {
- this.Dispose(true);
- }
- public void Close()
- {
- this.Dispose(true);
- }
- Public Shared Function Using(Of TSource, TResource As IDisposable)(
- ByVal resourceFactory As Func(Of TResource),
- ByVal observableFactory As Func(Of TResource, IObservable(Of TSource)))
- As IObservable(Of TSource)
- using System;
- using System.IO;
- using System.Reactive.Concurrency;
- using System.Reactive.Disposables;
- using System.Reactive.Linq;
- namespace MyLib
- {
- public static class ObservableExtensions
- {
- //TODO: Could potentially upgrade to using tasks/Await-LC
- public static IObservable<byte> ToObservable(
- this Stream source,
- int buffersize,
- IScheduler scheduler)
- {
- var bytes = Observable.Create<byte>(o =>
- {
- var initialState = new StreamReaderState(source, buffersize);
- var currentStateSubscription = new SerialDisposable();
- Action<StreamReaderState, Action<StreamReaderState>> iterator =
- (state, self) =>
- currentStateSubscription.Disposable = state.ReadNext()
- .Subscribe(
- bytesRead =>
- {
- for (int i = 0; i < bytesRead; i++)
- {
- o.OnNext(state.Buffer[i]);
- }
- if (bytesRead > 0)
- self(state);
- else
- o.OnCompleted();
- },
- o.OnError);
- var scheduledWork = scheduler.Schedule(initialState, iterator);
- return new CompositeDisposable(currentStateSubscription, scheduledWork);
- });
- return Observable.Using(() => source, _ => bytes);
- }
- private sealed class StreamReaderState
- {
- private readonly int _bufferSize;
- private readonly Func<byte[], int, int, IObservable<int>> _factory;
- public StreamReaderState(Stream source, int bufferSize)
- {
- _bufferSize = bufferSize;
- _factory = Observable.FromAsyncPattern<byte[], int, int, int>(
- source.BeginRead,
- source.EndRead);
- Buffer = new byte[bufferSize];
- }
- public IObservable<int> ReadNext()
- {
- return _factory(Buffer, 0, _bufferSize);
- }
- public byte[] Buffer { get; set; }
- }
- }
- }
Add Comment
Please, Sign In to add comment