Przeglądaj źródła

feat: support kickout clients in batch

JianBo He 2 lat temu
rodzic
commit
1e7872c319

+ 26 - 1
apps/emqx_management/src/emqx_mgmt.erl

@@ -47,6 +47,7 @@
     lookup_client/2,
     lookup_client/3,
     kickout_client/1,
+    kickout_clients/1,
     list_authz_cache/1,
     list_client_subscriptions/1,
     client_subscriptions/2,
@@ -58,7 +59,9 @@
     clean_pem_cache_all/1,
     set_ratelimit_policy/2,
     set_quota_policy/2,
-    set_keepalive/2
+    set_keepalive/2,
+
+    do_kickout_clients/1
 ]).
 
 %% Internal functions
@@ -321,6 +324,28 @@ kickout_client(ClientId) ->
 kickout_client(Node, ClientId) ->
     unwrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)).
 
+kickout_clients(ClientIds) when is_list(ClientIds) ->
+    F = fun(Node) ->
+        emqx_management_proto_v3:kickout_clients(Node, ClientIds)
+    end,
+    Results = lists:map(F, emqx:running_nodes()),
+    case lists:filter(fun(Res) -> Res =/= ok end, Results) of
+        [] ->
+            ok;
+        [Result | _] ->
+            unwrap_rpc(Result)
+    end.
+
+do_kickout_clients(ClientIds) when is_list(ClientIds) ->
+    F = fun(ClientId) ->
+        ChanPids = emqx_cm:lookup_channels(local, ClientId),
+        lists:foreach(
+            fun(ChanPid) -> emqx_cm:kick_session(ClientId, ChanPid) end,
+            ChanPids
+        )
+    end,
+    lists:foreach(F, ClientIds).
+
 list_authz_cache(ClientId) ->
     call_client(ClientId, list_authz_cache).
 

+ 26 - 0
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -36,6 +36,7 @@
 
 -export([
     clients/2,
+    kickout_clients/2,
     client/2,
     subscriptions/2,
     authz_cache/2,
@@ -88,6 +89,7 @@ api_spec() ->
 paths() ->
     [
         "/clients",
+        "/clients/kickout/bulk",
         "/clients/:clientid",
         "/clients/:clientid/authorization/cache",
         "/clients/:clientid/subscriptions",
@@ -211,6 +213,21 @@ schema("/clients") ->
             }
         }
     };
+schema("/clients/kickout/bulk") ->
+    #{
+        'operationId' => kickout_clients,
+        post => #{
+            description => ?DESC(kickout_clients),
+            tags => ?TAGS,
+            'requestBody' => hoconsc:mk(
+                hoconsc:array(binary()),
+                #{desc => <<"The list of Client IDs that need to be kicked out">>}
+            ),
+            responses => #{
+                204 => <<"Kick out clients successfully">>
+            }
+        }
+    };
 schema("/clients/:clientid") ->
     #{
         'operationId' => client,
@@ -568,6 +585,15 @@ fields(unsubscribe) ->
 clients(get, #{query_string := QString}) ->
     list_clients(QString).
 
+kickout_clients(post, #{body := ClientIDs}) ->
+    case emqx_mgmt:kickout_clients(ClientIDs) of
+        ok ->
+            {204};
+        {error, Reason} ->
+            Message = list_to_binary(io_lib:format("~p", [Reason])),
+            {500, #{code => <<"UNKNOW_ERROR">>, message => Message}}
+    end.
+
 client(get, #{bindings := Bindings}) ->
     lookup(Bindings);
 client(delete, #{bindings := Bindings}) ->

+ 7 - 1
apps/emqx_management/src/proto/emqx_management_proto_v3.erl

@@ -32,7 +32,9 @@
 
     call_client/3,
 
-    get_full_config/1
+    get_full_config/1,
+
+    kickout_clients/2
 ]).
 
 -include_lib("emqx/include/bpapi.hrl").
@@ -78,3 +80,7 @@ call_client(Node, ClientId, Req) ->
 -spec get_full_config(node()) -> map() | list() | {badrpc, _}.
 get_full_config(Node) ->
     rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []).
+
+-spec kickout_clients(node(), [emqx_types:clientid()]) -> ok | {badrpc, _}.
+kickout_clients(Node, ClientIds) ->
+    rpc:call(Node, emqx_mgmt, do_kickout_clients, [ClientIds]).

+ 41 - 0
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -167,6 +167,47 @@ t_clients(_) ->
     AfterKickoutResponse1 = emqx_mgmt_api_test_util:request_api(get, Client1Path),
     ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse1).
 
+t_kickout_clients(_) ->
+    process_flag(trap_exit, true),
+
+    ClientId1 = <<"client1">>,
+    ClientId2 = <<"client2">>,
+    ClientId3 = <<"client3">>,
+
+    {ok, C1} = emqtt:start_link(#{
+        clientid => ClientId1,
+        proto_ver => v5,
+        properties => #{'Session-Expiry-Interval' => 120}
+    }),
+    {ok, _} = emqtt:connect(C1),
+    {ok, C2} = emqtt:start_link(#{clientid => ClientId2}),
+    {ok, _} = emqtt:connect(C2),
+    {ok, C3} = emqtt:start_link(#{clientid => ClientId3}),
+    {ok, _} = emqtt:connect(C3),
+
+    timer:sleep(300),
+
+    %% get /clients
+    ClientsPath = emqx_mgmt_api_test_util:api_path(["clients"]),
+    {ok, Clients} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
+    ClientsResponse = emqx_utils_json:decode(Clients, [return_maps]),
+    ClientsMeta = maps:get(<<"meta">>, ClientsResponse),
+    ClientsPage = maps:get(<<"page">>, ClientsMeta),
+    ClientsLimit = maps:get(<<"limit">>, ClientsMeta),
+    ClientsCount = maps:get(<<"count">>, ClientsMeta),
+    ?assertEqual(ClientsPage, 1),
+    ?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()),
+    ?assertEqual(ClientsCount, 3),
+
+    %% kickout clients
+    KickoutPath = emqx_mgmt_api_test_util:api_path(["clients", "kickout", "bulk"]),
+    KickoutBody = [ClientId1, ClientId2, ClientId3],
+    {ok, _} = emqx_mgmt_api_test_util:request_api_with_body(post, KickoutPath, KickoutBody),
+
+    {ok, Clients2} = emqx_mgmt_api_test_util:request_api(get, ClientsPath),
+    ClientsResponse2 = emqx_utils_json:decode(Clients2, [return_maps]),
+    ?assertMatch(#{<<"meta">> := #{<<"count">> := 0}}, ClientsResponse2).
+
 t_query_clients_with_time(_) ->
     process_flag(trap_exit, true),
 

+ 5 - 0
rel/i18n/emqx_mgmt_api_clients.hocon

@@ -5,6 +5,11 @@ list_clients.desc:
 list_clients.label:
 """List clients"""
 
+kickout_clients.desc:
+"""Kick out a batch of client by client IDs"""
+kickout_clients.label:
+"""Kick out a batch of client by client IDs"""
+
 clients_info_from_id.desc:
 """Get clients info by client ID"""
 clients_info_from_id.label: