Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- %% application:ensure_all_started(ssl),
- %% amqp10_client_sup:start_link(),
- Address = "address",
- Hostname = <<"hostname">>,
- Port = 5671,
- User = <<"user">>,
- Password = <<"password">>,
- Queue = <<"queue_name">>,
- Container = Queue,
- ReceiveAMQP10Event =
- fun(Event, Exception) ->
- receive
- {amqp10_event, Event} ->
- ok
- after 2000 ->
- throw(Exception)
- end
- end,
- OpnConf = #{
- address => Address,
- hostname => Hostname,
- port => Port,
- notify => self(),
- tls_opts => {secure_port, [{versions, ['tlsv1.1']}]},
- container_id => Container,
- sasl => {plain, User, Password}
- },
- io:format("amqp10_client:open_connection/1~n"),
- {ok, Connection} = amqp10_client:open_connection(OpnConf),
- ReceiveAMQP10Event({connection, Connection, opened}, connection_timeout),
- io:format("amqp10_client:begin_session/1~n"),
- {ok, Session} = amqp10_client:begin_session(Connection),
- ReceiveAMQP10Event({session, Session, begun}, session_timeout),
- io:format("amqp10_client:attach_sender_link/3~n"),
- {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"test-sender">>, Queue),
- ReceiveAMQP10Event({link, Sender, attached}, attached_timeout),
- ReceiveAMQP10Event({link, Sender, credited}, credited_timeout),
- SendMsg =
- fun(N) ->
- N1 = integer_to_binary(N),
- MsgBody = <<<<"body: N=">>/bitstring, N1/bitstring>>,
- Msg0 = amqp10_msg:new(<<"test_msg">>, MsgBody, false),
- P = #{group_id => <<"test group_id">>},
- Msg = amqp10_msg:set_properties(P, Msg0),
- ok = amqp10_client:send_msg(Sender, Msg),
- receive
- {amqp10_disposition, {rejected, <<"test_msg">>}} ->
- throw(message_rejected);
- {amqp10_disposition, {accepted, <<"test_msg">>}} ->
- ok
- after 2000 ->
- throw(disposition_timeout)
- end
- end,
- io:format("send 1 message~n"),
- SendMsg(0),
- io:format("amqp10_client:attach_receiver_link/3~n"),
- {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>, Queue),
- ReceiveAMQP10Event({link, Receiver, attached}, attached_timeout),
- io:format("receive 1 message/1~n"),
- MsgBody = [{'v1_0.data', <<"body: N=0">>}],
- {ok, {amqp10_msg, _, _, _, _, _, _, MsgBody, _}} = amqp10_client:get_msg(Receiver),
- io:format("send 10 messages~n"),
- [SendMsg(X) || X <- lists:seq(1, 10)],
- CountReceivedMessages =
- fun() ->
- fun F(N) ->
- receive
- {amqp10_msg, Receiver, Msg} ->
- amqp10_client:accept_msg(Receiver, Msg),
- F(N + 1)
- after 2000 ->
- N
- end
- end(0)
- end,
- io:format("receive 3 messages~n"),
- ok = amqp10_client:flow_link_credit(Receiver, 3, never),
- ReceiveAMQP10Event({link, Receiver, credit_exhausted}, exhausted_timeout),
- 3 = CountReceivedMessages(),
- io:format("receive all the messages~n"),
- ok = amqp10_client:flow_link_credit(Receiver, 2, 1),
- ReceiveAMQP10Event({link, Receiver, credit_exhausted}, exhausted_timeout),
- 7 = CountReceivedMessages(),
- %% ok = amqp10_client:detach_link(Sender),
- %% ok = amqp10_client:detach_link(Receiver),
- %% ok = amqp10_client:end_session(Session),
- %% ok = amqp10_client:close_connection(Connection),
- timer:sleep(1000), flush().
Add Comment
Please, Sign In to add comment