|
|
@@ -289,6 +289,67 @@ t_subscribe_inuse(_) ->
|
|
|
with_connection(TopicIdInuseViaHttp),
|
|
|
with_connection(SubscriptionInuseViaHttp).
|
|
|
|
|
|
+t_receive_from_mqtt_publish(_) ->
|
|
|
+ with_connection(fun(Sock) ->
|
|
|
+ ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>),
|
|
|
+ ?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
|
|
|
+
|
|
|
+ ok = send_subscribe_frame(Sock, 0, <<"/queue/foo">>),
|
|
|
+ ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
|
|
|
+
|
|
|
+ %% send mqtt publish with content-type
|
|
|
+ Msg = emqx_message:make(
|
|
|
+ _From = from_testsuite,
|
|
|
+ _QoS = 1,
|
|
|
+ _Topic = <<"/queue/foo">>,
|
|
|
+ _Payload = <<"hello">>,
|
|
|
+ _Flags = #{},
|
|
|
+ _Headers = #{properties => #{'Content-Type' => <<"application/json">>}}
|
|
|
+ ),
|
|
|
+ emqx:publish(Msg),
|
|
|
+
|
|
|
+ {ok, Frame} = recv_a_frame(Sock),
|
|
|
+ ?assertEqual(
|
|
|
+ <<"application/json">>,
|
|
|
+ proplists:get_value(<<"content-type">>, Frame#stomp_frame.headers)
|
|
|
+ ),
|
|
|
+
|
|
|
+ ?assertMatch(
|
|
|
+ #stomp_frame{
|
|
|
+ command = <<"MESSAGE">>,
|
|
|
+ headers = _,
|
|
|
+ body = <<"hello">>
|
|
|
+ },
|
|
|
+ Frame
|
|
|
+ ),
|
|
|
+ lists:foreach(
|
|
|
+ fun({Key, Val}) ->
|
|
|
+ Val = proplists:get_value(Key, Frame#stomp_frame.headers)
|
|
|
+ end,
|
|
|
+ [
|
|
|
+ {<<"destination">>, <<"/queue/foo">>},
|
|
|
+ {<<"subscription">>, <<"0">>}
|
|
|
+ ]
|
|
|
+ ),
|
|
|
+
|
|
|
+ %% assert subscription stats
|
|
|
+ [ClientInfo1] = clients(),
|
|
|
+ ?assertMatch(#{subscriptions_cnt := 1}, ClientInfo1),
|
|
|
+
|
|
|
+ %% Unsubscribe
|
|
|
+ ok = send_unsubscribe_frame(Sock, 0),
|
|
|
+ ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
|
|
|
+
|
|
|
+ %% assert subscription stats
|
|
|
+ [ClientInfo2] = clients(),
|
|
|
+ ?assertMatch(#{subscriptions_cnt := 0}, ClientInfo2),
|
|
|
+
|
|
|
+ ok = send_message_frame(Sock, <<"/queue/foo">>, <<"You will not receive this msg">>),
|
|
|
+ ?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
|
|
|
+
|
|
|
+ {error, timeout} = gen_tcp:recv(Sock, 0, 500)
|
|
|
+ end).
|
|
|
+
|
|
|
t_transaction(_) ->
|
|
|
with_connection(fun(Sock) ->
|
|
|
gen_tcp:send(
|