فهرست منبع

fix(api): client sub & unsub batch

DDDHuang 3 سال پیش
والد
کامیت
6c9cad366b

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -13,6 +13,7 @@
 {emqx_gateway_http,1}.
 {emqx_gateway_http,1}.
 {emqx_license,1}.
 {emqx_license,1}.
 {emqx_management,1}.
 {emqx_management,1}.
+{emqx_management,2}.
 {emqx_mgmt_api_plugins,1}.
 {emqx_mgmt_api_plugins,1}.
 {emqx_mgmt_cluster,1}.
 {emqx_mgmt_cluster,1}.
 {emqx_mgmt_trace,1}.
 {emqx_mgmt_trace,1}.

+ 16 - 2
apps/emqx_management/src/emqx_management.appup.src

@@ -1,7 +1,21 @@
 %% -*- mode: erlang -*-
 %% -*- mode: erlang -*-
 %% Unless you know what you are doing, DO NOT edit manually!!
 %% Unless you know what you are doing, DO NOT edit manually!!
 {VSN,
 {VSN,
-  [{"5.0.0",[{load_module,emqx_mgmt_cli,brutal_purge,soft_purge,[]}]},
+  [{"5.0.0",[
+    {load_module,emqx_mgmt_cli,brutal_purge,soft_purge,[]},
+    {add_module,emqx_management_proto_v2},
+    {load_module,emqx_mgmt_api_clients,brutal_purge,soft_purge,[]},
+    {load_module,emqx_mgmt_api_publish,brutal_purge,soft_purge,[]},
+    {load_module,emqx_mgmt_util,brutal_purge,soft_purge,[]},
+    {load_module,emqx_mgmt,brutal_purge,soft_purge,[]}
+   ]},
    {<<".*">>,[]}],
    {<<".*">>,[]}],
-  [{"5.0.0",[{load_module,emqx_mgmt_cli,brutal_purge,soft_purge,[]}]},
+  [{"5.0.0",[
+    {load_module,emqx_mgmt_cli,brutal_purge,soft_purge,[]},
+    {delete_module,emqx_management_proto_v2},
+    {load_module,emqx_mgmt_api_clients,brutal_purge,soft_purge,[]},
+    {load_module,emqx_mgmt_api_publish,brutal_purge,soft_purge,[]},
+    {load_module,emqx_mgmt_util,brutal_purge,soft_purge,[]},
+    {load_module,emqx_mgmt,brutal_purge,soft_purge,[]}
+   ]},
    {<<".*">>,[]}]}.
    {<<".*">>,[]}]}.

+ 26 - 2
apps/emqx_management/src/emqx_mgmt.erl

@@ -83,7 +83,9 @@
     do_subscribe/2,
     do_subscribe/2,
     publish/1,
     publish/1,
     unsubscribe/2,
     unsubscribe/2,
-    do_unsubscribe/2
+    do_unsubscribe/2,
+    unsubscribe_batch/2,
+    do_unsubscribe_batch/2
 ]).
 ]).
 
 
 %% Alarms
 %% Alarms
@@ -417,7 +419,6 @@ do_subscribe(ClientId, TopicTables) ->
         [{_, Pid}] -> Pid ! {subscribe, TopicTables}
         [{_, Pid}] -> Pid ! {subscribe, TopicTables}
     end.
     end.
 
 
-%%TODO: ???
 publish(Msg) ->
 publish(Msg) ->
     emqx_metrics:inc_msg(Msg),
     emqx_metrics:inc_msg(Msg),
     emqx:publish(Msg).
     emqx:publish(Msg).
@@ -445,6 +446,29 @@ do_unsubscribe(ClientId, Topic) ->
         [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic)]}
         [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic)]}
     end.
     end.
 
 
+-spec unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) ->
+    {unsubscribe, _} | {error, channel_not_found}.
+unsubscribe_batch(ClientId, Topics) ->
+    unsubscribe_batch(mria_mnesia:running_nodes(), ClientId, Topics).
+
+-spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
+    {unsubscribe_batch, _} | {error, channel_not_found}.
+unsubscribe_batch([Node | Nodes], ClientId, Topics) ->
+    case wrap_rpc(emqx_management_proto_v2:unsubscribe_batch(Node, ClientId, Topics)) of
+        {error, _} -> unsubscribe_batch(Nodes, ClientId, Topics);
+        Re -> Re
+    end;
+unsubscribe_batch([], _ClientId, _Topics) ->
+    {error, channel_not_found}.
+
+-spec do_unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) ->
+    {unsubscribe_batch, _} | {error, _}.
+do_unsubscribe_batch(ClientId, Topics) ->
+    case ets:lookup(emqx_channel, ClientId) of
+        [] -> {error, channel_not_found};
+        [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic) || Topic <- Topics]}
+    end.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Get Alarms
 %% Get Alarms
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------

+ 65 - 16
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -39,8 +39,9 @@
     subscriptions/2,
     subscriptions/2,
     authz_cache/2,
     authz_cache/2,
     subscribe/2,
     subscribe/2,
-    unsubscribe/2,
     subscribe_batch/2,
     subscribe_batch/2,
+    unsubscribe/2,
+    unsubscribe_batch/2,
     set_keepalive/2
     set_keepalive/2
 ]).
 ]).
 
 
@@ -88,7 +89,9 @@ paths() ->
         "/clients/:clientid/authorization/cache",
         "/clients/:clientid/authorization/cache",
         "/clients/:clientid/subscriptions",
         "/clients/:clientid/subscriptions",
         "/clients/:clientid/subscribe",
         "/clients/:clientid/subscribe",
+        "/clients/:clientid/subscribe/bulk",
         "/clients/:clientid/unsubscribe",
         "/clients/:clientid/unsubscribe",
+        "/clients/:clientid/unsubscribe/bulk",
         "/clients/:clientid/keepalive"
         "/clients/:clientid/keepalive"
     ].
     ].
 
 
@@ -293,6 +296,21 @@ schema("/clients/:clientid/subscribe") ->
             }
             }
         }
         }
     };
     };
+schema("/clients/:clientid/subscribe/bulk") ->
+    #{
+        'operationId' => subscribe_batch,
+        post => #{
+            description => <<"Subscribe">>,
+            parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
+            'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, subscribe))),
+            responses => #{
+                200 => hoconsc:array(hoconsc:ref(emqx_mgmt_api_subscriptions, subscription)),
+                404 => emqx_dashboard_swagger:error_codes(
+                    ['CLIENTID_NOT_FOUND'], <<"Client id not found">>
+                )
+            }
+        }
+    };
 schema("/clients/:clientid/unsubscribe") ->
 schema("/clients/:clientid/unsubscribe") ->
     #{
     #{
         'operationId' => unsubscribe,
         'operationId' => unsubscribe,
@@ -308,6 +326,21 @@ schema("/clients/:clientid/unsubscribe") ->
             }
             }
         }
         }
     };
     };
+schema("/clients/:clientid/unsubscribe/bulk") ->
+    #{
+        'operationId' => unsubscribe_batch,
+        post => #{
+            description => <<"Unsubscribe">>,
+            parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
+            'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, unsubscribe))),
+            responses => #{
+                204 => <<"Unsubscribe OK">>,
+                404 => emqx_dashboard_swagger:error_codes(
+                    ['CLIENTID_NOT_FOUND'], <<"Client id not found">>
+                )
+            }
+        }
+    };
 schema("/clients/:clientid/keepalive") ->
 schema("/clients/:clientid/keepalive") ->
     #{
     #{
         'operationId' => set_keepalive,
         'operationId' => set_keepalive,
@@ -543,11 +576,6 @@ subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
     Opts = emqx_map_lib:unsafe_atom_key_map(TopicInfo),
     Opts = emqx_map_lib:unsafe_atom_key_map(TopicInfo),
     subscribe(Opts#{clientid => ClientID}).
     subscribe(Opts#{clientid => ClientID}).
 
 
-unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
-    Topic = maps:get(<<"topic">>, TopicInfo),
-    unsubscribe(#{clientid => ClientID, topic => Topic}).
-
-%% TODO: batch
 subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) ->
 subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) ->
     Topics =
     Topics =
         [
         [
@@ -556,6 +584,14 @@ subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}
         ],
         ],
     subscribe_batch(#{clientid => ClientID, topics => Topics}).
     subscribe_batch(#{clientid => ClientID, topics => Topics}).
 
 
+unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
+    Topic = maps:get(<<"topic">>, TopicInfo),
+    unsubscribe(#{clientid => ClientID, topic => Topic}).
+
+unsubscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) ->
+    Topics = [Topic || #{<<"topic">> := Topic} <- TopicInfos],
+    unsubscribe_batch(#{clientid => ClientID, topics => Topics}).
+
 subscriptions(get, #{bindings := #{clientid := ClientID}}) ->
 subscriptions(get, #{bindings := #{clientid := ClientID}}) ->
     case emqx_mgmt:list_client_subscriptions(ClientID) of
     case emqx_mgmt:list_client_subscriptions(ClientID) of
         [] ->
         [] ->
@@ -668,9 +704,20 @@ subscribe(#{clientid := ClientID, topic := Topic} = Sub) ->
         {error, Reason} ->
         {error, Reason} ->
             Message = list_to_binary(io_lib:format("~p", [Reason])),
             Message = list_to_binary(io_lib:format("~p", [Reason])),
             {500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
             {500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
-        {ok, Node} ->
-            Response = Sub#{node => Node},
-            {200, Response}
+        {ok, SubInfo} ->
+            {200, SubInfo}
+    end.
+
+subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
+    case lookup(#{clientid => ClientID}) of
+        {200, _} ->
+            ArgList = [
+                [ClientID, Topic, maps:with([qos, nl, rap, rh], Sub)]
+             || #{topic := Topic} = Sub <- Topics
+            ],
+            {200, emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList)};
+        {404, ?CLIENT_ID_NOT_FOUND} ->
+            {404, ?CLIENT_ID_NOT_FOUND}
     end.
     end.
 
 
 unsubscribe(#{clientid := ClientID, topic := Topic}) ->
 unsubscribe(#{clientid := ClientID, topic := Topic}) ->
@@ -681,12 +728,14 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) ->
             {204}
             {204}
     end.
     end.
 
 
-subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
-    ArgList = [
-        [ClientID, Topic, maps:with([qos, nl, rap, rh], Sub)]
-     || #{topic := Topic} = Sub <- Topics
-    ],
-    emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList).
+unsubscribe_batch(#{clientid := ClientID, topics := Topics}) ->
+    case lookup(#{clientid => ClientID}) of
+        {200, _} ->
+            _ = emqx_mgmt:unsubscribe_batch(ClientID, Topics),
+            {204};
+        {404, ?CLIENT_ID_NOT_FOUND} ->
+            {404, ?CLIENT_ID_NOT_FOUND}
+    end.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% internal function
 %% internal function
@@ -700,7 +749,7 @@ do_subscribe(ClientID, Topic0, Options) ->
         {subscribe, Subscriptions, Node} ->
         {subscribe, Subscriptions, Node} ->
             case proplists:is_defined(Topic, Subscriptions) of
             case proplists:is_defined(Topic, Subscriptions) of
                 true ->
                 true ->
-                    {ok, Node};
+                    {ok, Options#{node => Node, clientid => ClientID, topic => Topic}};
                 false ->
                 false ->
                     {error, unknow_error}
                     {error, unknow_error}
             end
             end

+ 5 - 3
apps/emqx_management/src/emqx_mgmt_api_publish.erl

@@ -142,12 +142,14 @@ message(Map) ->
             {error, R}
             {error, R}
     end.
     end.
 
 
-encode_payload(plain, Payload) -> {ok, Payload};
+encode_payload(plain, Payload) ->
+    {ok, Payload};
 encode_payload(base64, Payload) ->
 encode_payload(base64, Payload) ->
     try
     try
         {ok, base64:decode(Payload)}
         {ok, base64:decode(Payload)}
-    catch _:_ ->
-        {error, {decode_base64_payload_failed, Payload}}
+    catch
+        _:_ ->
+            {error, {decode_base64_payload_failed, Payload}}
     end.
     end.
 
 
 messages(List) ->
 messages(List) ->

+ 20 - 14
apps/emqx_management/src/emqx_mgmt_util.erl

@@ -202,23 +202,29 @@ json_content_schema(Schema, Desc) ->
 
 
 %%%==============================================================================================
 %%%==============================================================================================
 batch_operation(Module, Function, ArgsList) ->
 batch_operation(Module, Function, ArgsList) ->
-    Failed = batch_operation(Module, Function, ArgsList, []),
-    Len = erlang:length(Failed),
-    Success = erlang:length(ArgsList) - Len,
-    Fun =
-        fun({Args, Reason}, Detail) ->
-            [#{data => Args, reason => io_lib:format("~p", [Reason])} | Detail]
-        end,
-    #{success => Success, failed => Len, detail => lists:foldl(Fun, [], Failed)}.
+    {Succeed, Failed} = batch_operation(Module, Function, ArgsList, {[], []}),
+    case erlang:length(Failed) of
+        0 ->
+            Succeed;
+        _FLen ->
+            Fun =
+                fun({Args, Reason}, Detail) ->
+                    [
+                        #{data => Args, reason => list_to_binary(io_lib:format("~p", [Reason]))}
+                        | Detail
+                    ]
+                end,
+            #{succeed => Succeed, failed => lists:foldl(Fun, [], Failed)}
+    end.
 
 
-batch_operation(_Module, _Function, [], Failed) ->
-    lists:reverse(Failed);
-batch_operation(Module, Function, [Args | ArgsList], Failed) ->
+batch_operation(_Module, _Function, [], {Succeed, Failed}) ->
+    {lists:reverse(Succeed), lists:reverse(Failed)};
+batch_operation(Module, Function, [Args | ArgsList], {Succeed, Failed}) ->
     case erlang:apply(Module, Function, Args) of
     case erlang:apply(Module, Function, Args) of
-        ok ->
-            batch_operation(Module, Function, ArgsList, Failed);
+        {ok, Res} ->
+            batch_operation(Module, Function, ArgsList, {[Res | Succeed], Failed});
         {error, Reason} ->
         {error, Reason} ->
-            batch_operation(Module, Function, ArgsList, [{Args, Reason} | Failed])
+            batch_operation(Module, Function, ArgsList, {Succeed, [{Args, Reason} | Failed]})
     end.
     end.
 
 
 properties(Props) ->
 properties(Props) ->

+ 35 - 0
apps/emqx_management/src/proto/emqx_management_proto_v2.erl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_management_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    unsubscribe_batch/3
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.0.1".
+
+-spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) ->
+    {unsubscribe, _} | {error, _} | {badrpc, _}.
+unsubscribe_batch(Node, ClientId, Topics) ->
+    rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]).