Advertisement
Guest User

Untitled

a guest
Oct 12th, 2015
225
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.73 KB | None | 0 0
  1. #!perl6
  2.  
  3. use v6;
  4.  
  5. await IO::Socket::Async.connect('test.mosquitto.org',1883).then( -> $p {
  6. return if not $p.status;
  7. given $p.result {
  8. react {
  9. my $bs = .bytes-supply;
  10.  
  11. my Buf $buf .= new;
  12. my Supply $packets .= new;
  13.  
  14. sub parse (Buf $buf is rw) {
  15. my $offset = 1;
  16.  
  17. my $multiplier = 1;
  18. my $length = 0;
  19. my $d;
  20. {
  21. return if $offset >= $buf.elems;
  22. $d = $buf.subbuf($offset++, 1).unpack("C");
  23. dd $d;
  24. $length += ($d +& 0x7f) * $multiplier;
  25. dd $length;
  26. $multiplier *= 128;
  27. redo if $d +& 0x80;
  28. }
  29. return if $length > $buf.elems + $offset;
  30.  
  31. my $first_byte = $buf[0];
  32.  
  33. my $packet = hash {
  34. type => ($first_byte +& 0xf0) +> 4,
  35. dup => ($first_byte +& 0x08) +> 3,
  36. qos => ($first_byte +& 0x06) +> 1,
  37. retain => ($first_byte +& 0x01),
  38. data => $buf.subbuf($offset, $length);
  39. };
  40.  
  41. $buf .= subbuf($offset + $length);
  42.  
  43. $packets.emit($packet);
  44. }
  45.  
  46. .write(pack("C C n a* C C n n a*", 16, 15, 6, "MQIsdp", 3, 2, 60, 1, "a"));
  47.  
  48. $packets.tap(-> $_ {
  49. say "Received packet of type {.<type>}";
  50. });
  51.  
  52. whenever $bs -> $received {
  53. $buf ~= $received;
  54. $buf.say;
  55. parse $buf;
  56. }
  57. }
  58. .close;
  59. }
  60. });
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement