|
|
@@ -91,7 +91,13 @@ groups() ->
|
|
|
t_sqlparse_new_map,
|
|
|
t_sqlparse_invalid_json
|
|
|
]},
|
|
|
- {events, [], [t_events]},
|
|
|
+ {events, [], [
|
|
|
+ t_events,
|
|
|
+ t_event_client_disconnected_normal,
|
|
|
+ t_event_client_disconnected_kicked,
|
|
|
+ t_event_client_disconnected_discarded,
|
|
|
+ t_event_client_disconnected_takenover
|
|
|
+ ]},
|
|
|
{telemetry, [], [
|
|
|
t_get_basic_usage_info_0,
|
|
|
t_get_basic_usage_info_1
|
|
|
@@ -474,6 +480,165 @@ t_events(_Config) ->
|
|
|
client_connack_failed(),
|
|
|
ok.
|
|
|
|
|
|
+t_event_client_disconnected_normal(_Config) ->
|
|
|
+ SQL =
|
|
|
+ "select * "
|
|
|
+ "from \"$events/client_disconnected\" ",
|
|
|
+ RepubT = <<"repub/to/disconnected/normal">>,
|
|
|
+
|
|
|
+ {ok, TopicRule} = emqx_rule_engine:create_rule(
|
|
|
+ #{
|
|
|
+ sql => SQL,
|
|
|
+ id => ?TMP_RULEID,
|
|
|
+ actions => [republish_action(RepubT, <<>>)]
|
|
|
+ }
|
|
|
+ ),
|
|
|
+
|
|
|
+ {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
|
|
+ {ok, _} = emqtt:connect(Client),
|
|
|
+ {ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
|
|
|
+ ct:sleep(200),
|
|
|
+ {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
|
|
+ {ok, _} = emqtt:connect(Client1),
|
|
|
+ emqtt:disconnect(Client1),
|
|
|
+
|
|
|
+ receive
|
|
|
+ {publish, #{topic := T, payload := Payload}} ->
|
|
|
+ ?assertEqual(RepubT, T),
|
|
|
+ ?assertMatch(#{<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps]))
|
|
|
+ after 1000 ->
|
|
|
+ ct:fail(wait_for_repub_disconnected_normal)
|
|
|
+ end,
|
|
|
+ emqtt:stop(Client),
|
|
|
+
|
|
|
+ delete_rule(TopicRule).
|
|
|
+
|
|
|
+t_event_client_disconnected_kicked(_Config) ->
|
|
|
+ SQL =
|
|
|
+ "select * "
|
|
|
+ "from \"$events/client_disconnected\" ",
|
|
|
+ RepubT = <<"repub/to/disconnected/kicked">>,
|
|
|
+
|
|
|
+ {ok, TopicRule} = emqx_rule_engine:create_rule(
|
|
|
+ #{
|
|
|
+ sql => SQL,
|
|
|
+ id => ?TMP_RULEID,
|
|
|
+ actions => [republish_action(RepubT, <<>>)]
|
|
|
+ }
|
|
|
+ ),
|
|
|
+
|
|
|
+ {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
|
|
+ {ok, _} = emqtt:connect(Client),
|
|
|
+ {ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
|
|
|
+ ct:sleep(200),
|
|
|
+
|
|
|
+ {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
|
|
+ {ok, _} = emqtt:connect(Client1),
|
|
|
+ %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
|
|
+ unlink(Client1),
|
|
|
+
|
|
|
+ emqx_cm:kick_session(<<"emqx">>),
|
|
|
+
|
|
|
+ receive
|
|
|
+ {publish, #{topic := T, payload := Payload}} ->
|
|
|
+ ?assertEqual(RepubT, T),
|
|
|
+ ?assertMatch(#{<<"reason">> := <<"kicked">>}, emqx_json:decode(Payload, [return_maps]))
|
|
|
+ after 1000 ->
|
|
|
+ ct:fail(wait_for_repub_disconnected_kicked)
|
|
|
+ end,
|
|
|
+
|
|
|
+ emqtt:stop(Client),
|
|
|
+ delete_rule(TopicRule).
|
|
|
+
|
|
|
+t_event_client_disconnected_discarded(_Config) ->
|
|
|
+ SQL =
|
|
|
+ "select * "
|
|
|
+ "from \"$events/client_disconnected\" ",
|
|
|
+ RepubT = <<"repub/to/disconnected/discarded">>,
|
|
|
+
|
|
|
+ {ok, TopicRule} = emqx_rule_engine:create_rule(
|
|
|
+ #{
|
|
|
+ sql => SQL,
|
|
|
+ id => ?TMP_RULEID,
|
|
|
+ actions => [republish_action(RepubT, <<>>)]
|
|
|
+ }
|
|
|
+ ),
|
|
|
+
|
|
|
+ {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
|
|
|
+ {ok, _} = emqtt:connect(Client),
|
|
|
+ {ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
|
|
|
+ ct:sleep(200),
|
|
|
+
|
|
|
+ {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
|
|
+ {ok, _} = emqtt:connect(Client1),
|
|
|
+ %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
|
|
+ unlink(Client1),
|
|
|
+
|
|
|
+ {ok, Client2} = emqtt:start_link([
|
|
|
+ {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}
|
|
|
+ ]),
|
|
|
+ {ok, _} = emqtt:connect(Client2),
|
|
|
+
|
|
|
+ receive
|
|
|
+ {publish, #{topic := T, payload := Payload}} ->
|
|
|
+ ?assertEqual(RepubT, T),
|
|
|
+ ?assertMatch(
|
|
|
+ #{<<"reason">> := <<"discarded">>}, emqx_json:decode(Payload, [return_maps])
|
|
|
+ )
|
|
|
+ after 1000 ->
|
|
|
+ ct:fail(wait_for_repub_disconnected_discarded)
|
|
|
+ end,
|
|
|
+ emqtt:stop(Client),
|
|
|
+ emqtt:stop(Client2),
|
|
|
+
|
|
|
+ delete_rule(TopicRule).
|
|
|
+
|
|
|
+t_event_client_disconnected_takenover(_Config) ->
|
|
|
+ SQL =
|
|
|
+ "select * "
|
|
|
+ "from \"$events/client_disconnected\" ",
|
|
|
+ RepubT = <<"repub/to/disconnected/takenover">>,
|
|
|
+
|
|
|
+ {ok, TopicRule} = emqx_rule_engine:create_rule(
|
|
|
+ #{
|
|
|
+ sql => SQL,
|
|
|
+ id => ?TMP_RULEID,
|
|
|
+ actions => [republish_action(RepubT, <<>>)]
|
|
|
+ }
|
|
|
+ ),
|
|
|
+
|
|
|
+ {ok, ClientRecv} = emqtt:start_link([
|
|
|
+ {clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}
|
|
|
+ ]),
|
|
|
+ {ok, _} = emqtt:connect(ClientRecv),
|
|
|
+ {ok, _, _} = emqtt:subscribe(ClientRecv, RepubT, 0),
|
|
|
+ ct:sleep(200),
|
|
|
+
|
|
|
+ {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
|
|
|
+ {ok, _} = emqtt:connect(Client1),
|
|
|
+ %% the process will receive {'EXIT',{shutdown,tcp_closed}}
|
|
|
+ unlink(Client1),
|
|
|
+
|
|
|
+ {ok, Client2} = emqtt:start_link([
|
|
|
+ {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}
|
|
|
+ ]),
|
|
|
+ {ok, _} = emqtt:connect(Client2),
|
|
|
+
|
|
|
+ receive
|
|
|
+ {publish, #{topic := T, payload := Payload}} ->
|
|
|
+ ?assertEqual(RepubT, T),
|
|
|
+ ?assertMatch(
|
|
|
+ #{<<"reason">> := <<"takenover">>}, emqx_json:decode(Payload, [return_maps])
|
|
|
+ )
|
|
|
+ after 1000 ->
|
|
|
+ ct:fail(wait_for_repub_disconnected_discarded)
|
|
|
+ end,
|
|
|
+
|
|
|
+ emqtt:stop(ClientRecv),
|
|
|
+ emqtt:stop(Client2),
|
|
|
+
|
|
|
+ delete_rule(TopicRule).
|
|
|
+
|
|
|
client_connack_failed() ->
|
|
|
{ok, Client} = emqtt:start_link(
|
|
|
[
|