Guest User

Untitled

a guest
Jan 19th, 2019
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.93 KB | None | 0 0
  1. Private Function ObserveUDP() As IObservable(Of bytes())
  2.  
  3.  
  4. Dim f = Function(observer)
  5. Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
  6. Dim client = New UdpClient(endpoint)
  7.  
  8. Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _
  9. ( Nothing _
  10. , Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _
  11. , Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _
  12. , Function(task As Task(Of UdpReceiveResult)) task.Result)
  13.  
  14. Dim observable = obs.Select(Function(r) r.Buffer)
  15.  
  16. dim handle = observable.Subscribe(observer)
  17.  
  18. Dim df = Sub()
  19. client.Close()
  20. handle.Dispose()
  21. End Sub
  22.  
  23. Return Disposable.Create(df)
  24.  
  25. End Function
  26.  
  27. Return observable.Create(f)
  28.  
  29. End Function
  30.  
  31. ''' initializer - a function that initializes and returns the state object
  32. ''' generator - a function that asynchronously using await generates each value
  33. ''' finalizer - a function for cleaning up the state object when the sequence is unsubscribed
  34.  
  35. Private Function ObservableAsyncSeq(Of T, I)( _
  36. initializer As Func(Of I), _
  37. generator As Func(Of I, Task(Of T)), _
  38. finalizer As Action(Of I)) As IObservable(Of T)
  39.  
  40. Dim q = Function(observer As IObserver(Of T))
  41. Dim go = True
  42. Try
  43. Dim r = Async Sub()
  44. Dim ii As I = initializer()
  45. While go
  46. Dim result = Await generator(ii)
  47. observer.OnNext(result)
  48. End While
  49. finalizer(ii)
  50. observer.OnCompleted()
  51. End Sub
  52. Task.Run(r)
  53. Catch ex As Exception
  54. observer.OnError(ex)
  55. End Try
  56.  
  57. ' Disposable for stopping the sequence as per
  58. ' the observable contract
  59. Return Sub() go = False
  60.  
  61. End Function
  62.  
  63. Return Observable.Create(q)
  64. End Function
  65.  
  66. Private Function ObserveMeasurementPoints2() As IObservable(Of ProcessedDate)
  67. Dim initializer = Function()
  68. Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
  69. Return New UdpClient(endpoint)
  70. End Function
  71.  
  72. Dim finalizer = Function(client As UdpClient)
  73. client.Close()
  74. End Function
  75.  
  76. Dim generator = Function(client As UdpClient) As Task(Of UdpReceiveResult)
  77. Return client.ReceiveAsync()
  78. End Function
  79.  
  80. Return ObservableAsyncSeq(initializer, generator, finalizer).Select(Function(r) ProcessBytes(r.Buffer))
  81.  
  82. End Function
  83.  
  84. Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
  85. Return Observable.Using(Of T, UdpClient)(
  86. Function() New UdpClient(endpoint),
  87. Function(udpClient) _
  88. Observable.Defer(Function() udpClient.ReceiveAsync().ToObservable()) _
  89. .Repeat() _
  90. .Select(Function(result) processor(result.Buffer))
  91. )
  92. End Function
  93.  
  94. Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
  95. Return Observable.Using(
  96. Function() New UdpClient(endpoint),
  97. Function(udpClient) Observable.Defer( _
  98. Observable.FromAsyncPattern(
  99. AddressOf udpClient.BeginReceive,
  100. Function(iar)
  101. Dim remoteEp = TryCast(iar.AsyncState, IPEndPoint)
  102. Return udpClient.EndReceive(iar, remoteEp)
  103. End Function)
  104. ).Repeat() _
  105. .Select(processor)
  106. )
  107. End Function
  108.  
  109. Shared Sub Main()
  110. Using UdpStream(New IPEndPoint(IPAddress.Loopback, 13200),
  111. Function(bytes) String.Join(",", bytes)
  112. ).Subscribe(AddressOf Console.WriteLine)
  113. Console.ReadLine()
  114. End Using
  115.  
  116. Console.WriteLine("Done")
  117. Console.ReadKey()
  118. End Sub
  119.  
  120. void IDisposable.Dispose()
  121. {
  122. this.Dispose(true);
  123. }
  124.  
  125. public void Close()
  126. {
  127. this.Dispose(true);
  128. }
  129.  
  130. Public Shared Function Using(Of TSource, TResource As IDisposable)(
  131. ByVal resourceFactory As Func(Of TResource),
  132. ByVal observableFactory As Func(Of TResource, IObservable(Of TSource)))
  133. As IObservable(Of TSource)
  134.  
  135. using System;
  136. using System.IO;
  137. using System.Reactive.Concurrency;
  138. using System.Reactive.Disposables;
  139. using System.Reactive.Linq;
  140.  
  141. namespace MyLib
  142. {
  143. public static class ObservableExtensions
  144. {
  145. //TODO: Could potentially upgrade to using tasks/Await-LC
  146. public static IObservable<byte> ToObservable(
  147. this Stream source,
  148. int buffersize,
  149. IScheduler scheduler)
  150. {
  151. var bytes = Observable.Create<byte>(o =>
  152. {
  153. var initialState = new StreamReaderState(source, buffersize);
  154. var currentStateSubscription = new SerialDisposable();
  155. Action<StreamReaderState, Action<StreamReaderState>> iterator =
  156. (state, self) =>
  157. currentStateSubscription.Disposable = state.ReadNext()
  158. .Subscribe(
  159. bytesRead =>
  160. {
  161. for (int i = 0; i < bytesRead; i++)
  162. {
  163. o.OnNext(state.Buffer[i]);
  164. }
  165. if (bytesRead > 0)
  166. self(state);
  167. else
  168. o.OnCompleted();
  169. },
  170. o.OnError);
  171. var scheduledWork = scheduler.Schedule(initialState, iterator);
  172. return new CompositeDisposable(currentStateSubscription, scheduledWork);
  173. });
  174. return Observable.Using(() => source, _ => bytes);
  175. }
  176.  
  177. private sealed class StreamReaderState
  178. {
  179. private readonly int _bufferSize;
  180. private readonly Func<byte[], int, int, IObservable<int>> _factory;
  181. public StreamReaderState(Stream source, int bufferSize)
  182. {
  183. _bufferSize = bufferSize;
  184. _factory = Observable.FromAsyncPattern<byte[], int, int, int>(
  185. source.BeginRead,
  186. source.EndRead);
  187. Buffer = new byte[bufferSize];
  188. }
  189. public IObservable<int> ReadNext()
  190. {
  191. return _factory(Buffer, 0, _bufferSize);
  192. }
  193. public byte[] Buffer { get; set; }
  194. }
  195. }
  196. }
Add Comment
Please, Sign In to add comment