|
|
@@ -33,7 +33,7 @@ end_per_suite(_Config) ->
|
|
|
emqx_ct_helpers:stop_apps([]).
|
|
|
|
|
|
t_on_client_connected(_) ->
|
|
|
- ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, ?QOS_0}])),
|
|
|
+ ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, #{qos => ?QOS_0}}])),
|
|
|
{ok, C} = emqtt:start_link([{host, "localhost"},
|
|
|
{clientid, "myclient"},
|
|
|
{username, "admin"}]),
|
|
|
@@ -43,18 +43,42 @@ t_on_client_connected(_) ->
|
|
|
?assertEqual(<<"connected/myclient/admin">>, Topic),
|
|
|
?assertEqual(<<"Hello world">>, Payload),
|
|
|
ok = emqtt:disconnect(C),
|
|
|
- ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/%c/%u">>, ?QOS_0}])).
|
|
|
+ ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/%c/%u">>, #{qos => ?QOS_0}}])).
|
|
|
|
|
|
t_on_undefined_client_connected(_) ->
|
|
|
- ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/undefined">>, ?QOS_0}])),
|
|
|
+ ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/undefined">>, #{qos => ?QOS_1}}])),
|
|
|
{ok, C} = emqtt:start_link([{host, "localhost"}]),
|
|
|
{ok, _} = emqtt:connect(C),
|
|
|
- emqtt:publish(C, <<"connected/undefined">>, <<"Hello world">>, ?QOS_0),
|
|
|
+ emqtt:publish(C, <<"connected/undefined">>, <<"Hello world">>, ?QOS_1),
|
|
|
{ok, #{topic := Topic, payload := Payload}} = receive_publish(100),
|
|
|
?assertEqual(<<"connected/undefined">>, Topic),
|
|
|
?assertEqual(<<"Hello world">>, Payload),
|
|
|
ok = emqtt:disconnect(C),
|
|
|
- ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/undefined">>, ?QOS_0}])).
|
|
|
+ ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/undefined">>, #{qos => ?QOS_1}}])).
|
|
|
+
|
|
|
+t_suboption(_) ->
|
|
|
+ Client_info = fun(Key, Client) -> maps:get(Key, maps:from_list(emqtt:info(Client)), undefined) end,
|
|
|
+ Suboption = #{qos => ?QOS_2, nl => 1, rap => 1, rh => 2},
|
|
|
+ ?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, Suboption}])),
|
|
|
+ {ok, C1} = emqtt:start_link([{proto_ver, v5}]),
|
|
|
+ {ok, _} = emqtt:connect(C1),
|
|
|
+ timer:sleep(200),
|
|
|
+ [CPid1] = emqx_cm:lookup_channels(Client_info(clientid, C1)),
|
|
|
+ [ Sub1 | _ ] = ets:lookup(emqx_subscription,CPid1),
|
|
|
+ [ Suboption1 | _ ] = ets:lookup(emqx_suboption,Sub1),
|
|
|
+ ?assertMatch({Sub1, #{qos := 2, nl := 1, rap := 1, rh := 2, subid := _}}, Suboption1),
|
|
|
+ ok = emqtt:disconnect(C1),
|
|
|
+ %% The subscription option is not valid for MQTT V3.1.1
|
|
|
+ {ok, C2} = emqtt:start_link([{proto_ver, v4}]),
|
|
|
+ {ok, _} = emqtt:connect(C2),
|
|
|
+ timer:sleep(200),
|
|
|
+ [CPid2] = emqx_cm:lookup_channels(Client_info(clientid, C2)),
|
|
|
+ [ Sub2 | _ ] = ets:lookup(emqx_subscription,CPid2),
|
|
|
+ [ Suboption2 | _ ] = ets:lookup(emqx_suboption,Sub2),
|
|
|
+ ok = emqtt:disconnect(C2),
|
|
|
+ ?assertMatch({Sub2, #{qos := 2, nl := 0, rap := 0, rh := 0, subid := _}}, Suboption2),
|
|
|
+
|
|
|
+ ?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/undefined">>, Suboption}])).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Internal functions
|
|
|
@@ -66,4 +90,3 @@ receive_publish(Timeout) ->
|
|
|
after
|
|
|
Timeout -> {error, timeout}
|
|
|
end.
|
|
|
-
|