Guest User

Untitled

a guest
Apr 20th, 2018
107
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.29 KB | None | 0 0
  1. %% application:ensure_all_started(ssl),
  2. %% amqp10_client_sup:start_link(),
  3.  
  4. Address = "address",
  5. Hostname = <<"hostname">>,
  6. Port = 5671,
  7. User = <<"user">>,
  8. Password = <<"password">>,
  9. Queue = <<"queue_name">>,
  10. Container = Queue,
  11.  
  12.  
  13. ReceiveAMQP10Event =
  14. fun(Event, Exception) ->
  15. receive
  16. {amqp10_event, Event} ->
  17. ok
  18. after 2000 ->
  19. throw(Exception)
  20. end
  21. end,
  22.  
  23. OpnConf = #{
  24. address => Address,
  25. hostname => Hostname,
  26. port => Port,
  27. notify => self(),
  28. tls_opts => {secure_port, [{versions, ['tlsv1.1']}]},
  29. container_id => Container,
  30. sasl => {plain, User, Password}
  31. },
  32.  
  33. io:format("amqp10_client:open_connection/1~n"),
  34. {ok, Connection} = amqp10_client:open_connection(OpnConf),
  35. ReceiveAMQP10Event({connection, Connection, opened}, connection_timeout),
  36.  
  37.  
  38. io:format("amqp10_client:begin_session/1~n"),
  39. {ok, Session} = amqp10_client:begin_session(Connection),
  40. ReceiveAMQP10Event({session, Session, begun}, session_timeout),
  41.  
  42. io:format("amqp10_client:attach_sender_link/3~n"),
  43. {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Queue),
  44. ReceiveAMQP10Event({link, Sender, attached}, attached_timeout),
  45. ReceiveAMQP10Event({link, Sender, credited}, credited_timeout),
  46.  
  47. SendMsg =
  48. fun(N) ->
  49. N1 = integer_to_binary(N),
  50. MsgBody = <<<<"body: N=">>/bitstring, N1/bitstring>>,
  51. Msg0 = amqp10_msg:new(<<"test_msg">>, MsgBody, false),
  52. P = #{group_id => <<"test group_id">>},
  53. Msg = amqp10_msg:set_properties(P, Msg0),
  54. ok = amqp10_client:send_msg(Sender, Msg),
  55. receive
  56. {amqp10_disposition, {rejected, <<"test_msg">>}} ->
  57. throw(message_rejected);
  58. {amqp10_disposition, {accepted, <<"test_msg">>}} ->
  59. ok
  60. after 2000 ->
  61. throw(disposition_timeout)
  62. end
  63. end,
  64.  
  65. io:format("send 1 message~n"),
  66. SendMsg(0),
  67.  
  68. io:format("amqp10_client:attach_receiver_link/3~n"),
  69. {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>, Queue),
  70. ReceiveAMQP10Event({link, Receiver, attached}, attached_timeout),
  71.  
  72. io:format("receive 1 message/1~n"),
  73. MsgBody = [{'v1_0.data', <<"body: N=0">>}],
  74. {ok, {amqp10_msg, _, _, _, _, _, _, MsgBody, _}} = amqp10_client:get_msg(Receiver),
  75.  
  76. io:format("send 10 messages~n"),
  77. [SendMsg(X) || X <- lists:seq(1, 10)],
  78.  
  79. CountReceivedMessages =
  80. fun() ->
  81. fun F(N) ->
  82. receive
  83. {amqp10_msg, Receiver, Msg} ->
  84. amqp10_client:accept_msg(Receiver, Msg),
  85. F(N + 1)
  86. after 2000 ->
  87. N
  88. end
  89. end(0)
  90. end,
  91.  
  92. io:format("receive 3 messages~n"),
  93. ok = amqp10_client:flow_link_credit(Receiver, 3, never),
  94. ReceiveAMQP10Event({link, Receiver, credit_exhausted}, exhausted_timeout),
  95. 3 = CountReceivedMessages(),
  96.  
  97. io:format("receive all the messages~n"),
  98. ok = amqp10_client:flow_link_credit(Receiver, 2, 1),
  99. ReceiveAMQP10Event({link, Receiver, credit_exhausted}, exhausted_timeout),
  100. 7 = CountReceivedMessages(),
  101.  
  102. %% ok = amqp10_client:detach_link(Sender),
  103. %% ok = amqp10_client:detach_link(Receiver),
  104. %% ok = amqp10_client:end_session(Session),
  105. %% ok = amqp10_client:close_connection(Connection),
  106.  
  107. timer:sleep(1000), flush().
Add Comment
Please, Sign In to add comment