Kaynağa Gözat

Merge branch 'master' into health_check_timeout

Yang Miao 4 yıl önce
ebeveyn
işleme
b528862c67
50 değiştirilmiş dosya ile 1099 ekleme ve 452 silme
  1. 1 1
      .github/workflows/run_api_tests.yaml
  2. 3 1
      README.md
  3. 37 28
      apps/emqx/src/emqx_cm.erl
  4. 23 0
      apps/emqx/src/emqx_cm.hrl
  5. 14 0
      apps/emqx/src/emqx_rpc.erl
  6. 4 2
      apps/emqx/src/emqx_trace/emqx_trace.erl
  7. 0 12
      apps/emqx/src/proto/emqx_broker_proto_v1.erl
  8. 71 0
      apps/emqx/src/proto/emqx_cm_proto_v1.erl
  9. 27 13
      apps/emqx/test/emqx_bpapi_static_checks.erl
  10. 3 1
      apps/emqx/test/emqx_bpapi_suite.erl
  11. 3 5
      apps/emqx/test/emqx_cm_SUITE.erl
  12. 12 12
      apps/emqx/test/emqx_trace_SUITE.erl
  13. 6 4
      apps/emqx_conf/src/emqx_cluster_rpc.erl
  14. 17 0
      apps/emqx_dashboard/src/emqx_dashboard_swagger.erl
  15. 9 18
      apps/emqx_exhook/src/emqx_exhook_api.erl
  16. 46 0
      apps/emqx_exhook/src/proto/emqx_exhook_proto_v1.erl
  17. 2 1
      apps/emqx_gateway/src/emqx_gateway_api_clients.erl
  18. 97 38
      apps/emqx_gateway/src/emqx_gateway_cm.erl
  19. 15 36
      apps/emqx_gateway/src/emqx_gateway_http.erl
  20. 77 0
      apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl
  21. 2 1
      apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl
  22. 1 0
      apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl
  23. 7 4
      apps/emqx_management/src/emqx_mgmt.erl
  24. 6 10
      apps/emqx_management/src/emqx_mgmt_api.erl
  25. 6 3
      apps/emqx_management/src/emqx_mgmt_api_clients.erl
  26. 2 2
      apps/emqx_management/src/emqx_mgmt_api_configs.erl
  27. 19 3
      apps/emqx_management/src/emqx_mgmt_api_trace.erl
  28. 1 1
      apps/emqx_management/src/emqx_mgmt_cli.erl
  29. 6 0
      apps/emqx_management/src/proto/emqx_management_proto_v1.erl
  30. 4 4
      apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl
  31. 19 0
      apps/emqx_modules/include/emqx_modules.hrl
  32. 6 5
      apps/emqx_modules/src/emqx_delayed_api.erl
  33. 29 25
      apps/emqx_modules/src/emqx_event_message_api.erl
  34. 31 11
      apps/emqx_modules/src/emqx_modules_schema.erl
  35. 3 1
      apps/emqx_modules/src/emqx_rewrite_api.erl
  36. 2 2
      apps/emqx_modules/src/emqx_telemetry_api.erl
  37. 223 117
      apps/emqx_modules/src/emqx_topic_metrics_api.erl
  38. 1 1
      apps/emqx_prometheus/rebar.config
  39. 73 36
      apps/emqx_prometheus/src/emqx_prometheus_api.erl
  40. 3 3
      apps/emqx_prometheus/src/emqx_prometheus_schema.erl
  41. 1 0
      apps/emqx_resource/rebar.config
  42. 6 6
      apps/emqx_resource/src/emqx_resource.erl
  43. 5 1
      apps/emqx_resource/src/emqx_resource_instance.erl
  44. 62 0
      apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl
  45. 29 11
      apps/emqx_resource/test/emqx_resource_SUITE.erl
  46. 16 5
      apps/emqx_resource/test/emqx_test_resource.erl
  47. 50 25
      apps/emqx_statsd/src/emqx_statsd_api.erl
  48. 17 1
      apps/emqx_statsd/src/emqx_statsd_schema.erl
  49. 1 1
      mix.exs
  50. 1 1
      rebar.config

+ 1 - 1
.github/workflows/run_api_tests.yaml

@@ -86,7 +86,7 @@ jobs:
     - uses: actions/checkout@v2
       with:
         repository: emqx/emqx-fvt
-        ref: 1.0.3-dev2
+        ref: 1.0.4-dev1
         path: .
     - uses: actions/setup-java@v1
       with:

+ 3 - 1
README.md

@@ -116,7 +116,9 @@ Visiting [EMQ X FAQ](https://docs.emqx.io/en/broker/latest/faq/faq.html) to get
 
 ### Questions
 
-[GitHub Discussions](https://github.com/emqx/emqx/discussions) is where you can ask questions, and share ideas.
+- [GitHub Discussions](https://github.com/emqx/emqx/discussions) is where you can ask questions, and share ideas.
+- [Slack](https://slack-invite.emqx.io/) is where you can ask and discuss questions or contact our teams directly.
+- [Discord](https://discord.gg/xYGf3fQnES) is where you can get help and upcoming events related to IoT technologies.
 
 ### Proposals
 

+ 37 - 28
apps/emqx/src/emqx_cm.erl

@@ -80,9 +80,15 @@
         , mark_channel_connected/1
         , mark_channel_disconnected/1
         , get_connected_client_count/0
+
+        , do_kick_session/3
+        , do_get_chan_stats/2
+        , do_get_chan_info/2
+        , do_get_chann_conn_mod/2
         ]).
 
 -export_type([ channel_info/0
+             , chan_pid/0
              ]).
 
 -type(chan_pid() :: pid()).
@@ -92,6 +98,8 @@
                         , _Stats :: emqx_types:stats()
                         }).
 
+-include("emqx_cm.hrl").
+
 %% Tables for channel management.
 -define(CHAN_TAB, emqx_channel).
 -define(CHAN_CONN_TAB, emqx_channel_conn).
@@ -111,10 +119,6 @@
 %% Server name
 -define(CM, ?MODULE).
 
--define(T_KICK, 5_000).
--define(T_GET_INFO, 5_000).
--define(T_TAKEOVER, 15_000).
-
 %% linting overrides
 -elvis([ {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}}
        , {elvis_style, god_modules, #{ignore => [emqx_cm]}}
@@ -181,16 +185,19 @@ connection_closed(ClientId, ChanPid) ->
 get_chan_info(ClientId) ->
     with_channel(ClientId, fun(ChanPid) -> get_chan_info(ClientId, ChanPid) end).
 
--spec(get_chan_info(emqx_types:clientid(), chan_pid())
+-spec(do_get_chan_info(emqx_types:clientid(), chan_pid())
       -> maybe(emqx_types:infos())).
-get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() ->
+do_get_chan_info(ClientId, ChanPid) ->
     Chan = {ClientId, ChanPid},
     try ets:lookup_element(?CHAN_INFO_TAB, Chan, 2)
     catch
         error:badarg -> undefined
-    end;
+    end.
+
+-spec(get_chan_info(emqx_types:clientid(), chan_pid())
+      -> maybe(emqx_types:infos())).
 get_chan_info(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO).
+    wrap_rpc(emqx_cm_proto_v1:get_chan_info(ClientId, ChanPid)).
 
 %% @doc Update infos of the channel.
 -spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()).
@@ -206,16 +213,19 @@ set_chan_info(ClientId, Info) when is_binary(ClientId) ->
 get_chan_stats(ClientId) ->
     with_channel(ClientId, fun(ChanPid) -> get_chan_stats(ClientId, ChanPid) end).
 
--spec(get_chan_stats(emqx_types:clientid(), chan_pid())
+-spec(do_get_chan_stats(emqx_types:clientid(), chan_pid())
       -> maybe(emqx_types:stats())).
-get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
+do_get_chan_stats(ClientId, ChanPid) ->
     Chan = {ClientId, ChanPid},
     try ets:lookup_element(?CHAN_INFO_TAB, Chan, 3)
     catch
         error:badarg -> undefined
-    end;
+    end.
+
+-spec(get_chan_stats(emqx_types:clientid(), chan_pid())
+      -> maybe(emqx_types:stats())).
 get_chan_stats(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO).
+    wrap_rpc(emqx_cm_proto_v1:get_chan_stats(ClientId, ChanPid)).
 
 %% @doc Set channel's stats.
 -spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()).
@@ -368,7 +378,7 @@ do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
             {living, ConnMod, ChanPid, Session}
     end;
 do_takeover_session(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
+    wrap_rpc(emqx_cm_proto_v1:takeover_session(ClientId, ChanPid)).
 
 %% @doc Discard all the sessions identified by the ClientId.
 -spec(discard_session(emqx_types:clientid()) -> ok).
@@ -422,24 +432,20 @@ discard_session(ClientId, ChanPid) ->
 kick_session(ClientId, ChanPid) ->
     kick_session(kick, ClientId, ChanPid).
 
-%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
-kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
+-spec do_kick_session(kick | discard, emqx_types:clientid(), chan_pid()) -> ok.
+do_kick_session(Action, ClientId, ChanPid) ->
     case get_chann_conn_mod(ClientId, ChanPid) of
         undefined ->
             %% already deregistered
             ok;
         ConnMod when is_atom(ConnMod) ->
             ok = kick_or_kill(Action, ConnMod, ChanPid)
-    end;
+    end.
+
+%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
 kick_session(Action, ClientId, ChanPid) ->
-    %% call remote node on the old APIs because we do not know if they have upgraded
-    %% to have kick_session/3
-    Function = case Action of
-                   discard -> discard_session;
-                   kick -> kick_session
-               end,
     try
-        rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK)
+        wrap_rpc(emqx_cm_proto_v1:kick_session(Action, ClientId, ChanPid))
     catch
         Error : Reason ->
             %% This should mostly be RPC failures.
@@ -525,8 +531,8 @@ lookup_client({clientid, ClientId}) ->
           , Rec <- ets:lookup(emqx_channel_info, Key)].
 
 %% @private
-rpc_call(Node, Fun, Args, Timeout) ->
-    case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
+wrap_rpc(Result) ->
+    case Result of
         {badrpc, Reason} ->
             %% since emqx app 4.3.10, the 'kick' and 'discard' calls handler
             %% should catch all exceptions and always return 'ok'.
@@ -599,14 +605,17 @@ update_stats({Tab, Stat, MaxStat}) ->
         Size -> emqx_stats:setstat(Stat, MaxStat, Size)
     end.
 
-get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
+-spec do_get_chann_conn_mod(emqx_types:clientid(), chan_pid()) ->
+          module() | undefined.
+do_get_chann_conn_mod(ClientId, ChanPid) ->
     Chan = {ClientId, ChanPid},
     try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod
     catch
         error:badarg -> undefined
-    end;
+    end.
+
 get_chann_conn_mod(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).
+    wrap_rpc(emqx_cm_proto_v1:get_chann_conn_mod(ClientId, ChanPid)).
 
 mark_channel_connected(ChanPid) ->
     ?tp(emqx_cm_connected_client_count_inc, #{}),

+ 23 - 0
apps/emqx/src/emqx_cm.hrl

@@ -0,0 +1,23 @@
+%%-------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-ifndef(EMQX_CM_HRL).
+-define(EMQX_CM_HRL, true).
+
+-define(T_KICK, 5_000).
+-define(T_GET_INFO, 5_000).
+-define(T_TAKEOVER, 15_000).
+
+-endif.

+ 14 - 0
apps/emqx/src/emqx_rpc.erl

@@ -25,6 +25,8 @@
         , cast/5
         , multicall/4
         , multicall/5
+
+        , unwrap_erpc/1
         ]).
 
 -export_type([ badrpc/0
@@ -106,3 +108,15 @@ filter_result(Delivery) ->
 
 max_client_num() ->
     emqx:get_config([rpc, tcp_client_num], ?DefaultClientNum).
+
+-spec unwrap_erpc(emqx_rpc:erpc(A)) -> A | {error, _Err}.
+unwrap_erpc({ok, A}) ->
+    A;
+unwrap_erpc({throw, A}) ->
+    {error, A};
+unwrap_erpc({error, {exception, Err, _Stack}}) ->
+    {error, Err};
+unwrap_erpc({error, {exit, Err}}) ->
+    {error, Err};
+unwrap_erpc({error, {erpc, Err}}) ->
+    {error, Err}.

+ 4 - 2
apps/emqx/src/emqx_trace/emqx_trace.erl

@@ -145,7 +145,7 @@ list(Enable) ->
     ets:match_object(?TRACE, #?TRACE{enable = Enable, _ = '_'}).
 
 -spec create([{Key :: binary(), Value :: binary()}] | #{atom() => binary()}) ->
-    ok | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}.
+{ok, #?TRACE{}} | {error, {duplicate_condition, iodata()} | {already_existed, iodata()} | iodata()}.
 create(Trace) ->
     case mnesia:table_info(?TRACE, size) < ?MAX_SIZE of
         true ->
@@ -291,7 +291,9 @@ insert_new_trace(Trace) ->
                 #?TRACE{start_at = StartAt, type = Type, filter = Filter} = Trace,
                 Match = #?TRACE{_ = '_', start_at = StartAt, type = Type, filter = Filter},
                 case mnesia:match_object(?TRACE, Match, read) of
-                    [] -> mnesia:write(?TRACE, Trace, write);
+                    [] ->
+                        ok = mnesia:write(?TRACE, Trace, write),
+                        {ok, Trace};
                     [#?TRACE{name = Name}] -> mnesia:abort({duplicate_condition, Name})
                 end;
             [#?TRACE{name = Name}] -> mnesia:abort({already_existed, Name})

+ 0 - 12
apps/emqx/src/proto/emqx_broker_proto_v1.erl

@@ -25,9 +25,6 @@
         , list_client_subscriptions/2
         , list_subscriptions_via_topic/2
 
-        , lookup_client/2
-        , kickout_client/2
-
         , start_listener/2
         , stop_listener/2
         , restart_listener/2
@@ -48,15 +45,6 @@ forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
 forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
     emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
 
--spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
-kickout_client(Node, ClientId) ->
-    rpc:call(Node, emqx_cm, kick_session, [ClientId]).
-
--spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
-          [emqx_cm:channel_info()] | {badrpc, _}.
-lookup_client(Node, Key) ->
-    rpc:call(Node, emqx_cm, lookup_client, [Key]).
-
 -spec list_client_subscriptions(node(), emqx_types:clientid()) ->
                 [{emqx_types:topic(), emqx_types:subopts()}]
               | emqx_rpc:badrpc().

+ 71 - 0
apps/emqx/src/proto/emqx_cm_proto_v1.erl

@@ -0,0 +1,71 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_cm_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+
+        , lookup_client/2
+        , kickout_client/2
+
+        , get_chan_stats/2
+        , get_chan_info/2
+        , get_chann_conn_mod/2
+
+        , takeover_session/2
+        , kick_session/3
+        ]).
+
+-include("bpapi.hrl").
+-include("emqx_cm.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
+kickout_client(Node, ClientId) ->
+    rpc:call(Node, emqx_cm, kick_session, [ClientId]).
+
+-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
+          [emqx_cm:channel_info()] | {badrpc, _}.
+lookup_client(Node, Key) ->
+    rpc:call(Node, emqx_cm, lookup_client, [Key]).
+
+-spec get_chan_stats(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:stats() | {badrpc, _}.
+get_chan_stats(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO * 2).
+
+-spec get_chan_info(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:infos() | {badrpc, _}.
+get_chan_info(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2).
+
+-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) -> module() | undefined | {badrpc, _}.
+get_chann_conn_mod(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2).
+
+-spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) ->
+                none
+              | {expired | persistent, emqx_session:session()}
+              | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()}
+              | {badrpc, _}.
+takeover_session(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2).
+
+-spec kick_session(kick | discard, emqx_types:clientid(), emqx_cm:chan_pid()) -> ok | {badrpc, _}.
+kick_session(Action, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_kick_session, [Action, ClientId, ChanPid], ?T_KICK * 2).

+ 27 - 13
apps/emqx/test/emqx_bpapi_static_checks.erl

@@ -49,9 +49,18 @@
 %% List of known RPC backend modules:
 -define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
 %% List of known functions also known to do RPC:
--define(RPC_FUNCTIONS, "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5").
+-define(RPC_FUNCTIONS, "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5, "
+                       "emqx_plugin_libs_rule:cluster_call/3").
 %% List of functions in the RPC backend modules that we can ignore:
--define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0").
+-define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0, emqx_rpc:unwrap_erpc/1, rpc:pmap/3"). % TODO: handle pmap
+%% List of business-layer functions that are exempt from the checks:
+-define(EXEMPTIONS,
+        "emqx_mgmt_api:do_query/6,"             % Reason: legacy code. A fun and a QC query are
+                                                % passed in the args, it's futile to try to statically
+                                                % check it
+        "emqx_plugin_libs_rule:cluster_call/3"  % Reason: some sort of external plugin API that we
+                                                % don't want to break?
+       ).
 
 -define(XREF, myxref).
 
@@ -61,15 +70,20 @@
 
 -spec run() -> boolean().
 run() ->
-    dump(), %% TODO: check return value
-    Dumps = filelib:wildcard(dumps_dir() ++ "/*.bpapi"),
-    case Dumps of
-        [] ->
-            ?ERROR("No BPAPI dumps are found in ~s, abort", [dumps_dir()]),
-            false;
-        _ ->
-            ?NOTICE("Running API compatibility checks for ~p", [Dumps]),
-            check_compat(Dumps)
+    case dump() of
+        true ->
+            Dumps = filelib:wildcard(dumps_dir() ++ "/*.bpapi"),
+            case Dumps of
+                [] ->
+                    ?ERROR("No BPAPI dumps are found in ~s, abort", [dumps_dir()]),
+                    false;
+                _ ->
+                    ?NOTICE("Running API compatibility checks for ~p", [Dumps]),
+                    check_compat(Dumps)
+            end;
+        false ->
+            ?CRITICAL("Backplane API violations found on the current branch."),
+            false
     end.
 
 -spec check_compat([file:filename()]) -> boolean().
@@ -207,8 +221,8 @@ prepare(#{reldir := RelDir, plt := PLT}) ->
     dialyzer_plt:from_file(PLT).
 
 find_remote_calls(_Opts) ->
-    Query = "XC | (A - [" ?IGNORED_APPS "]:App - [" ?IGNORED_MODULES "] : Mod)
-               || (([" ?RPC_MODULES "] : Mod + [" ?RPC_FUNCTIONS "]) - " ?IGNORED_RPC_CALLS ")",
+    Query = "XC | (A - [" ?IGNORED_APPS "]:App - [" ?IGNORED_MODULES "]:Mod - [" ?EXEMPTIONS "])
+               || (([" ?RPC_MODULES "] : Mod + [" ?RPC_FUNCTIONS "]) - [" ?IGNORED_RPC_CALLS "])",
     {ok, Calls} = xref:q(?XREF, Query),
     ?INFO("Calls to RPC modules ~p", [Calls]),
     {Callers, _Callees} = lists:unzip(Calls),

+ 3 - 1
apps/emqx/test/emqx_bpapi_suite.erl

@@ -19,6 +19,7 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include_lib("emqx/include/logger.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("stdlib/include/assert.hrl").
 
@@ -28,7 +29,8 @@ init_per_suite(Config) ->
     Config.
 
 end_per_suite(_Config) ->
-    ok.
+    ?NOTICE("If this test suite failed, and you are unsure why, read this:~n"
+            "https://github.com/emqx/emqx/blob/master/apps/emqx/src/bpapi/README.md", []).
 
 t_run_check(_) ->
     ?assertMatch(true, emqx_bpapi_static_checks:run()).

+ 3 - 5
apps/emqx/test/emqx_cm_SUITE.erl

@@ -283,6 +283,7 @@ flush_emqx_pool() ->
 t_discard_session_race(_) ->
     ClientId = rand_client_id(),
     ?check_trace(
+       #{timetrap => 60000},
        begin
          #{conninfo := ConnInfo0} = ?ChanInfo,
          ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection},
@@ -290,12 +291,9 @@ t_discard_session_race(_) ->
          ok = emqx_cm:register_channel(ClientId, Pid, ConnInfo),
          Pid ! stop,
          receive {'DOWN', Ref, process, Pid, normal} -> ok end,
-         ok = emqx_cm:discard_session(ClientId),
-         {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000)
+         ?assertMatch(ok, emqx_cm:discard_session(ClientId))
        end,
-       fun(_, _) ->
-               true
-       end).
+       []).
 
 t_takeover_session(_) ->
     #{conninfo := ConnInfo} = ?ChanInfo,

+ 12 - 12
apps/emqx/test/emqx_trace_SUITE.erl

@@ -62,7 +62,7 @@ t_base_create_delete(_Config) ->
         end_at => End
     },
     AnotherTrace = Trace#{name => <<"anotherTrace">>},
-    ok = emqx_trace:create(Trace),
+    {ok, _} = emqx_trace:create(Trace),
     ?assertEqual({error, {already_existed, Name}}, emqx_trace:create(Trace)),
     ?assertEqual({error, {duplicate_condition, Name}}, emqx_trace:create(AnotherTrace)),
     [TraceRec] = emqx_trace:list(),
@@ -95,13 +95,13 @@ t_create_size_max(_Config) ->
         Name = list_to_binary("name" ++ integer_to_list(Seq)),
         Trace = [{name, Name}, {type, topic},
             {topic, list_to_binary("/x/y/" ++ integer_to_list(Seq))}],
-        ok = emqx_trace:create(Trace)
+        {ok, _} = emqx_trace:create(Trace)
               end, lists:seq(1, 30)),
     Trace31 = [{<<"name">>, <<"name31">>},
         {<<"type">>, topic}, {<<"topic">>, <<"/x/y/31">>}],
     {error, _} = emqx_trace:create(Trace31),
     ok = emqx_trace:delete(<<"name30">>),
-    ok = emqx_trace:create(Trace31),
+    {ok, _} = emqx_trace:create(Trace31),
     ?assertEqual(30, erlang:length(emqx_trace:list())),
     ok.
 
@@ -145,7 +145,7 @@ t_create_failed(_Config) ->
 
 t_create_default(_Config) ->
     {error, "name required"} = emqx_trace:create([]),
-    ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
+    {ok, _} = emqx_trace:create([{<<"name">>, <<"test-name">>},
         {<<"type">>, clientid}, {<<"clientid">>, <<"good">>}]),
     [#emqx_trace{name = <<"test-name">>}] = emqx_trace:list(),
     ok = emqx_trace:clear(),
@@ -166,7 +166,7 @@ t_create_default(_Config) ->
         {<<"end_at">>, to_rfc3339(Now + 3)}
     ],
     {error, "failed by start_at >= end_at"} = emqx_trace:create(Trace2),
-    ok = emqx_trace:create([{<<"name">>, <<"test-name">>},
+    {ok, _} = emqx_trace:create([{<<"name">>, <<"test-name">>},
         {<<"type">>, topic}, {<<"topic">>, <<"/x/y/z">>}]),
     [#emqx_trace{start_at = Start, end_at = End}] = emqx_trace:list(),
     ?assertEqual(10 * 60, End - Start),
@@ -182,7 +182,7 @@ t_create_with_extra_fields(_Config) ->
         {<<"clientid">>, <<"dev001">>},
         {<<"ip_address">>, <<"127.0.0.1">>}
     ],
-    ok = emqx_trace:create(Trace),
+    {ok, _} = emqx_trace:create(Trace),
     ?assertMatch([#emqx_trace{name = <<"test-name">>, filter = <<"/x/y/z">>, type = topic}],
         emqx_trace:list()),
     ok.
@@ -191,7 +191,7 @@ t_update_enable(_Config) ->
     Name = <<"test-name">>,
     Now = erlang:system_time(second),
     End = list_to_binary(calendar:system_time_to_rfc3339(Now + 2)),
-    ok = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, topic},
+    {ok, _} = emqx_trace:create([{<<"name">>, Name}, {<<"type">>, topic},
         {<<"topic">>, <<"/x/y/z">>}, {<<"end_at">>, End}]),
     [#emqx_trace{enable = Enable}] = emqx_trace:list(),
     ?assertEqual(Enable, true),
@@ -219,8 +219,8 @@ t_load_state(_Config) ->
     Finished = [{<<"name">>, <<"Finished">>}, {<<"type">>, topic},
         {<<"topic">>, <<"/x/y/3">>}, {<<"start_at">>, to_rfc3339(Now - 5)},
         {<<"end_at">>, to_rfc3339(Now)}],
-    ok = emqx_trace:create(Running),
-    ok = emqx_trace:create(Waiting),
+    {ok, _} = emqx_trace:create(Running),
+    {ok, _} = emqx_trace:create(Waiting),
     {error, "end_at time has already passed"} = emqx_trace:create(Finished),
     Traces = emqx_trace:format(emqx_trace:list()),
     ?assertEqual(2, erlang:length(Traces)),
@@ -241,7 +241,7 @@ t_client_event(_Config) ->
     Now = erlang:system_time(second),
     Start = to_rfc3339(Now),
     Name = <<"test_client_id_event">>,
-    ok = emqx_trace:create([{<<"name">>, Name},
+    {ok, _} = emqx_trace:create([{<<"name">>, Name},
         {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
     ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
     {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
@@ -250,7 +250,7 @@ t_client_event(_Config) ->
     ok = emqtt:publish(Client, <<"/test">>, #{}, <<"1">>, [{qos, 0}]),
     ok = emqtt:publish(Client, <<"/test">>, #{}, <<"2">>, [{qos, 0}]),
     ok = emqx_trace_handler_SUITE:filesync(Name, clientid),
-    ok = emqx_trace:create([{<<"name">>, <<"test_topic">>},
+    {ok, _} = emqx_trace:create([{<<"name">>, <<"test_topic">>},
         {<<"type">>, topic}, {<<"topic">>, <<"/test">>}, {<<"start_at">>, Start}]),
     ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic),
     {ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)),
@@ -279,7 +279,7 @@ t_get_log_filename(_Config) ->
         {<<"start_at">>, list_to_binary(Start)},
         {<<"end_at">>, list_to_binary(End)}
     ],
-    ok = emqx_trace:create(Trace),
+    {ok, _} = emqx_trace:create(Trace),
     ?assertEqual({error, not_found}, emqx_trace:get_trace_filename(<<"test">>)),
     ?assertEqual(ok, element(1, emqx_trace:get_trace_filename(Name))),
     ct:sleep(3000),

+ 6 - 4
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -29,7 +29,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
          handle_continue/2, code_change/3]).
 
--export_type([txn_id/0, succeed_num/0, multicall_return/0]).
+-export_type([txn_id/0, succeed_num/0, multicall_return/1, multicall_return/0]).
 
 -ifdef(TEST).
 -compile(export_all).
@@ -48,9 +48,11 @@
 
 -type succeed_num() :: pos_integer() | all.
 
--type multicall_return() :: {ok, txn_id(), _Result}
-                          | {error, term()}
-                          | {retry, txn_id(), _Result, node()}.
+-type multicall_return(Result) :: {ok, txn_id(), Result}
+                                | {error, term()}
+                                | {retry, txn_id(), Result, node()}.
+
+-type multicall_return() :: multicall_return(_).
 
 %%%===================================================================
 %%% API

+ 17 - 0
apps/emqx_dashboard/src/emqx_dashboard_swagger.erl

@@ -1,3 +1,19 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
 -module(emqx_dashboard_swagger).
 
 -include_lib("typerefl/include/types.hrl").
@@ -313,6 +329,7 @@ responses(Responses, Module) ->
 response(Status, Bin, {Acc, RefsAcc, Module}) when is_binary(Bin) ->
     {Acc#{integer_to_binary(Status) => #{description => Bin}}, RefsAcc, Module};
 %% Support swagger raw object(file download).
+%% TODO: multi type response(i.e. Support both 'application/json' and 'plain/text')
 response(Status, #{content := _} = Content, {Acc, RefsAcc, Module}) ->
     {Acc#{integer_to_binary(Status) => Content}, RefsAcc, Module};
 response(Status, ?REF(StructName), {Acc, RefsAcc, Module}) ->

+ 9 - 18
apps/emqx_exhook/src/emqx_exhook_api.erl

@@ -32,9 +32,6 @@
 -define(BAD_REQUEST, 'BAD_REQUEST').
 -define(BAD_RPC, 'BAD_RPC').
 
--type rpc_result() :: {error, any()}
-                    | any().
-
 -dialyzer([{nowarn_function, [ fill_cluster_server_info/5
                              , nodes_server_info/5
                              , fill_server_hooks_info/4
@@ -285,7 +282,7 @@ get_nodes_server_info(Name) ->
 %% GET /exhooks
 %%--------------------------------------------------------------------
 nodes_all_server_info(ConfL) ->
-    AllInfos = call_cluster(emqx_exhook_mgr, all_servers_info, []),
+    AllInfos = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:all_servers_info(Nodes) end),
     Default = emqx_exhook_metrics:new_metrics_info(),
     node_all_server_info(ConfL, AllInfos, Default, []).
 
@@ -324,7 +321,7 @@ fill_cluster_server_info([], StatusL, MetricsL, ServerName, _) ->
 %% GET /exhooks/{name}
 %%--------------------------------------------------------------------
 nodes_server_info(Name) ->
-    InfoL = call_cluster(emqx_exhook_mgr, server_info, [Name]),
+    InfoL = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:server_info(Nodes, Name) end),
     Default = emqx_exhook_metrics:new_metrics_info(),
     nodes_server_info(InfoL, Name, Default, [], []).
 
@@ -359,7 +356,7 @@ get_nodes_server_hooks_info(Name) ->
     case emqx_exhook_mgr:hooks(Name) of
         [] -> [];
         Hooks ->
-            AllInfos = call_cluster(emqx_exhook_mgr, server_hooks_metrics, [Name]),
+            AllInfos = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:server_hooks_metrics(Nodes, Name) end),
             Default = emqx_exhook_metrics:new_metrics_info(),
             get_nodes_server_hooks_info(Hooks, AllInfos, Default, [])
     end.
@@ -387,16 +384,10 @@ fill_server_hooks_info([], _Name, _Default, MetricsL) ->
 %%--------------------------------------------------------------------
 %% cluster call
 %%--------------------------------------------------------------------
-call_cluster(Module, Fun, Args) ->
-    Nodes = mria_mnesia:running_nodes(),
-    [{Node, rpc_call(Node, Module, Fun, Args)} || Node <- Nodes].
-
--spec rpc_call(node(), atom(), atom(), list()) -> rpc_result().
-rpc_call(Node, Module, Fun, Args) when Node =:= node() ->
-    erlang:apply(Module, Fun, Args);
 
-rpc_call(Node, Module, Fun, Args) ->
-    case rpc:call(Node, Module, Fun, Args) of
-        {badrpc, Reason} -> {error, Reason};
-        Res -> Res
-    end.
+-spec call_cluster(fun(([node()]) -> emqx_rpc:erpc_multicall(A))) ->
+          [{node(), A | {error, _Err}}].
+call_cluster(Fun) ->
+    Nodes = mria_mnesia:running_nodes(),
+    Ret = Fun(Nodes),
+    lists:zip(Nodes, lists:map(fun emqx_rpc:unwrap_erpc/1, Ret)).

+ 46 - 0
apps/emqx_exhook/src/proto/emqx_exhook_proto_v1.erl

@@ -0,0 +1,46 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+
+        , all_servers_info/1
+        , server_info/2
+        , server_hooks_metrics/2
+        ]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec all_servers_info([node()]) ->
+          emqx_rpc:erpc_multicall(map()).
+all_servers_info(Nodes) ->
+    erpc:multicall(Nodes, emqx_exhook_mgr, all_servers_info, []).
+
+-spec server_info([node()], emqx_exhook_mgr:server_name()) ->
+          emqx_rpc:erpc_multicall(map()).
+server_info(Nodes, Name) ->
+    erpc:multicall(Nodes, emqx_exhook_mgr, server_info, [Name]).
+
+-spec server_hooks_metrics([node()], emqx_exhook_mgr:server_name()) ->
+          emqx_rpc:erpc_multicall(emqx_exhook_metrics:hooks_metrics()).
+server_hooks_metrics(Nodes, Name) ->
+    erpc:multicall(Nodes, emqx_exhook_mgr, server_hooks_metrics, [Name]).

+ 2 - 1
apps/emqx_gateway/src/emqx_gateway_api_clients.erl

@@ -304,10 +304,11 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _},
 %% format funcs
 
 format_channel_info({_, Infos, Stats} = R) ->
+    Node = maps:get(node, Infos, node()),
     ClientInfo = maps:get(clientinfo, Infos, #{}),
     ConnInfo = maps:get(conninfo, Infos, #{}),
     SessInfo = maps:get(session, Infos, #{}),
-    FetchX = [ {node, ClientInfo, node()}
+    FetchX = [ {node, ClientInfo, Node}
              , {clientid, ClientInfo}
              , {username, ClientInfo}
              , {proto_name, ConnInfo}

+ 97 - 38
apps/emqx_gateway/src/emqx_gateway_cm.erl

@@ -36,6 +36,7 @@
         , register_channel/4
         , unregister_channel/2
         , insert_channel_info/4
+        , lookup_by_clientid/2
         , set_chan_info/3
         , set_chan_info/4
         , get_chan_info/2
@@ -63,6 +64,20 @@
         , code_change/3
         ]).
 
+%% RPC targets
+-export([ do_lookup_by_clientid/2
+        , do_get_chan_info/3
+        , do_set_chan_info/4
+        , do_get_chan_stats/3
+        , do_set_chan_stats/4
+        , do_discard_session/3
+        , do_kick_session/3
+        , do_get_chann_conn_mod/3
+        ]).
+
+-export_type([ gateway_name/0
+             ]).
+
 -record(state, {
           gwname    :: gateway_name(), %% Gateway Name
           locker    :: pid(),          %% ClientId Locker for CM
@@ -146,16 +161,38 @@ get_chan_info(GwName, ClientId) ->
             get_chan_info(GwName, ClientId, ChanPid)
         end).
 
--spec get_chan_info(gateway_name(), emqx_types:clientid(), pid())
+-spec do_lookup_by_clientid(gateway_name(), emqx_types:clientid()) ->
+          [pid()].
+do_lookup_by_clientid(GwName, ClientId) ->
+    ChanTab = emqx_gateway_cm:tabname(chan, GwName),
+    [Pid || {_, Pid} <- ets:lookup(ChanTab, ClientId)].
+
+-spec do_get_chan_info(gateway_name(), emqx_types:clientid(), pid())
       -> emqx_types:infos() | undefined.
-get_chan_info(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
+do_get_chan_info(GwName, ClientId, ChanPid) ->
     Chan = {ClientId, ChanPid},
-    try ets:lookup_element(tabname(info, GwName), Chan, 2)
+    try
+        Info = ets:lookup_element(tabname(info, GwName), Chan, 2),
+        Info#{node => node()}
     catch
         error:badarg -> undefined
-    end;
+    end.
+
+-spec get_chan_info(gateway_name(), emqx_types:clientid(), pid())
+      -> emqx_types:infos() | undefined.
 get_chan_info(GwName, ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chan_info, [GwName, ClientId, ChanPid]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_info(GwName, ClientId, ChanPid)).
+
+-spec lookup_by_clientid(gateway_name(), emqx_types:clientid()) ->
+          [pid()].
+lookup_by_clientid(GwName, ClientId) ->
+    Nodes = mria_mnesia:running_nodes(),
+    case emqx_gateway_cm_proto_v1:lookup_by_clientid(Nodes, GwName, ClientId) of
+        {Pids, []} ->
+            lists:append(Pids);
+        {_, _BadNodes} ->
+            error(badrpc)
+    end.
 
 %% @doc Update infos of the channel.
 -spec set_chan_info(gateway_name(),
@@ -164,18 +201,23 @@ get_chan_info(GwName, ClientId, ChanPid) ->
 set_chan_info(GwName, ClientId, Infos) ->
     set_chan_info(GwName, ClientId, self(), Infos).
 
--spec set_chan_info(gateway_name(),
-                    emqx_types:clientid(),
-                    pid(),
-                    emqx_types:infos()) -> boolean().
-set_chan_info(GwName, ClientId, ChanPid, Infos) when node(ChanPid) == node() ->
+-spec do_set_chan_info(gateway_name(),
+                       emqx_types:clientid(),
+                       pid(),
+                       emqx_types:infos()) -> boolean().
+do_set_chan_info(GwName, ClientId, ChanPid, Infos) ->
     Chan = {ClientId, ChanPid},
     try ets:update_element(tabname(info, GwName), Chan, {2, Infos})
     catch
         error:badarg -> false
-    end;
+    end.
+
+-spec set_chan_info(gateway_name(),
+                    emqx_types:clientid(),
+                    pid(),
+                    emqx_types:infos()) -> boolean().
 set_chan_info(GwName, ClientId, ChanPid, Infos) ->
-    rpc_call(node(ChanPid), set_chan_info, [GwName, ClientId, ChanPid, Infos]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_info(GwName, ClientId, ChanPid, Infos)).
 
 %% @doc Get channel's stats.
 -spec get_chan_stats(gateway_name(), emqx_types:clientid())
@@ -186,16 +228,19 @@ get_chan_stats(GwName, ClientId) ->
             get_chan_stats(GwName, ClientId, ChanPid)
         end).
 
--spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid())
+-spec do_get_chan_stats(gateway_name(), emqx_types:clientid(), pid())
       -> emqx_types:stats() | undefined.
-get_chan_stats(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
+do_get_chan_stats(GwName, ClientId, ChanPid) ->
     Chan = {ClientId, ChanPid},
     try ets:lookup_element(tabname(info, GwName), Chan, 3)
     catch
         error:badarg -> undefined
-    end;
+    end.
+
+-spec get_chan_stats(gateway_name(), emqx_types:clientid(), pid())
+      -> emqx_types:stats() | undefined.
 get_chan_stats(GwName, ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chan_stats, [GwName, ClientId, ChanPid]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:get_chan_stats(GwName, ClientId, ChanPid)).
 
 -spec set_chan_stats(gateway_name(),
                      emqx_types:clientid(),
@@ -203,19 +248,23 @@ get_chan_stats(GwName, ClientId, ChanPid) ->
 set_chan_stats(GwName, ClientId, Stats) ->
     set_chan_stats(GwName, ClientId, self(), Stats).
 
+-spec do_set_chan_stats(gateway_name(),
+                        emqx_types:clientid(),
+                        pid(),
+                        emqx_types:stats()) -> boolean().
+do_set_chan_stats(GwName, ClientId, ChanPid, Stats) ->
+    Chan = {ClientId, ChanPid},
+    try ets:update_element(tabname(info, GwName), Chan, {3, Stats})
+    catch
+        error:badarg -> false
+    end.
+
 -spec set_chan_stats(gateway_name(),
                      emqx_types:clientid(),
                      pid(),
                      emqx_types:stats()) -> boolean().
-set_chan_stats(GwName, ClientId, ChanPid, Stats)
-  when node(ChanPid) == node() ->
-    Chan = {ClientId, self()},
-    try ets:update_element(tabname(info, GwName), Chan, {3, Stats})
-    catch
-        error:badarg -> false
-    end;
 set_chan_stats(GwName, ClientId, ChanPid, Stats) ->
-    rpc_call(node(ChanPid), set_chan_stats, [GwName, ClientId, ChanPid, Stats]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:set_chan_stats(GwName, ClientId, ChanPid, Stats)).
 
 -spec connection_closed(gateway_name(), emqx_types:clientid()) -> true.
 connection_closed(GwName, ClientId) ->
@@ -297,11 +346,11 @@ create_session(GwName, ClientInfo, ConnInfo, CreateSessionFun, SessionMod) ->
 discard_session(GwName, ClientId) when is_binary(ClientId) ->
     case lookup_channels(GwName, ClientId) of
         [] -> ok;
-        ChanPids -> lists:foreach(fun(Pid) -> do_discard_session(GwName, ClientId, Pid) end, ChanPids)
+        ChanPids -> lists:foreach(fun(Pid) -> safe_discard_session(GwName, ClientId, Pid) end, ChanPids)
     end.
 
 %% @private
-do_discard_session(GwName, ClientId, Pid) ->
+safe_discard_session(GwName, ClientId, Pid) ->
     try
         discard_session(GwName, ClientId, Pid)
     catch
@@ -315,17 +364,20 @@ do_discard_session(GwName, ClientId, Pid) ->
             ok
     end.
 
-%% @private
-discard_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
+-spec do_discard_session(gateway_name(), emqx_types:clientid(), pid()) ->
+          _.
+do_discard_session(GwName, ClientId, ChanPid) ->
     case get_chann_conn_mod(GwName, ClientId, ChanPid) of
         undefined -> ok;
         ConnMod when is_atom(ConnMod) ->
             ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
-    end;
+    end.
 
 %% @private
+-spec discard_session(gateway_name(), emqx_types:clientid(), pid()) ->
+          _.
 discard_session(GwName, ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), discard_session, [GwName, ClientId, ChanPid]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:discard_session(GwName, ClientId, ChanPid)).
 
 -spec kick_session(gateway_name(), emqx_types:clientid())
     -> {error, any()}
@@ -346,16 +398,20 @@ kick_session(GwName, ClientId) ->
             kick_session(GwName, ClientId, ChanPid)
     end.
 
-kick_session(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
+-spec do_kick_session(gateway_name(), emqx_types:clientid(), pid()) ->
+          _.
+do_kick_session(GwName, ClientId, ChanPid) ->
     case get_chan_info(GwName, ClientId, ChanPid) of
         #{conninfo := #{conn_mod := ConnMod}} ->
             ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
         undefined ->
             {error, not_found}
-    end;
+    end.
 
+-spec kick_session(gateway_name(), emqx_types:clientid(), pid()) ->
+          _.
 kick_session(GwName, ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), kick_session, [GwName, ClientId, ChanPid]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:kick_session(GwName, ClientId, ChanPid)).
 
 with_channel(GwName, ClientId, Fun) ->
     case lookup_channels(GwName, ClientId) of
@@ -369,14 +425,17 @@ with_channel(GwName, ClientId, Fun) ->
 lookup_channels(GwName, ClientId) ->
     emqx_gateway_cm_registry:lookup_channels(GwName, ClientId).
 
-get_chann_conn_mod(GwName, ClientId, ChanPid) when node(ChanPid) == node() ->
+-spec do_get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom().
+do_get_chann_conn_mod(GwName, ClientId, ChanPid) ->
     Chan = {ClientId, ChanPid},
     try [ConnMod] = ets:lookup_element(tabname(conn, GwName), Chan, 2), ConnMod
     catch
         error:badarg -> undefined
-    end;
+    end.
+
+-spec get_chann_conn_mod(gateway_name(), emqx_types:clientid(), pid()) -> atom().
 get_chann_conn_mod(GwName, ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chann_conn_mod, [GwName, ClientId, ChanPid]).
+    wrap_rpc(emqx_gateway_cm_proto_v1:get_chann_conn_mod(GwName, ClientId, ChanPid)).
 
 %% Locker
 
@@ -398,8 +457,8 @@ locker_unlock(Locker, ClientId) ->
     ekka_locker:release(Locker, ClientId, quorum).
 
 %% @private
-rpc_call(Node, Fun, Args) ->
-    case rpc:call(Node, ?MODULE, Fun, Args) of
+wrap_rpc(Ret) ->
+    case Ret of
         {badrpc, Reason} -> error(Reason);
         Res -> Res
     end.

+ 15 - 36
apps/emqx_gateway/src/emqx_gateway_http.erl

@@ -47,9 +47,7 @@
 
 %% Mgmt APIs - clients
 -export([ lookup_client/3
-        , lookup_client/4
         , kickout_client/2
-        , kickout_client/3
         , list_client_subscriptions/2
         , client_subscribe/4
         , client_unsubscribe/3
@@ -231,41 +229,28 @@ confexp({error, Reason}) -> error(Reason).
 %%--------------------------------------------------------------------
 
 -spec lookup_client(gateway_name(),
-                    emqx_types:clientid(), {atom(), atom()}) -> list().
-lookup_client(GwName, ClientId, FormatFun) ->
-    lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun)
-                  || Node <- mria_mnesia:running_nodes()]).
-
-lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() ->
-    ChanTab = emqx_gateway_cm:tabname(chan, GwName),
-    InfoTab = emqx_gateway_cm:tabname(info, GwName),
-
-    lists:append(lists:map(
-      fun(Key) ->
-        lists:map(fun M:F/1, ets:lookup(InfoTab, Key))
-      end, ets:lookup(ChanTab, ClientId)));
-
-lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) ->
-    rpc_call(Node, lookup_client,
-             [Node, GwName, {clientid, ClientId}, FormatFun]).
+                    emqx_types:clientid(), {module(), atom()}) -> list().
+lookup_client(GwName, ClientId, {M, F}) ->
+    [begin
+         Info = emqx_gateway_cm:get_chan_info(GwName, ClientId, Pid),
+         Stats = emqx_gateway_cm:get_chan_stats(GwName, ClientId, Pid),
+         M:F({{ClientId, Pid}, Info, Stats})
+     end
+     || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)].
 
 -spec kickout_client(gateway_name(), emqx_types:clientid())
     -> {error, any()}
      | ok.
 kickout_client(GwName, ClientId) ->
-    Results = [kickout_client(Node, GwName, ClientId)
-               || Node <- mria_mnesia:running_nodes()],
-    case lists:any(fun(Item) -> Item =:= ok end, Results) of
-        true  -> ok;
-        false -> lists:last(Results)
+    Results = [emqx_gateway_cm:kick_session(GwName, ClientId, Pid)
+               || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)],
+    IsOk = lists:any(fun(Item) -> Item =:= ok end, Results),
+    case {IsOk, Results} of
+        {true , _ } -> ok;
+        {_    , []} -> {error, not_found};
+        {false, _ } -> lists:last(Results)
     end.
 
-kickout_client(Node, GwName, ClientId) when Node =:= node() ->
-    emqx_gateway_cm:kick_session(GwName, ClientId);
-
-kickout_client(Node, GwName, ClientId) ->
-    rpc_call(Node, kickout_client, [Node, GwName, ClientId]).
-
 -spec list_client_subscriptions(gateway_name(), emqx_types:clientid())
     -> {error, any()}
      | {ok, list()}.
@@ -459,9 +444,3 @@ to_list(B) when is_binary(B) ->
 
 %%--------------------------------------------------------------------
 %% Internal funcs
-
-rpc_call(Node, Fun, Args) ->
-    case rpc:call(Node, ?MODULE, Fun, Args) of
-        {badrpc, Reason} -> {error, Reason};
-        Res -> Res
-    end.

+ 77 - 0
apps/emqx_gateway/src/proto/emqx_gateway_cm_proto_v1.erl

@@ -0,0 +1,77 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_cm_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+
+        , get_chan_info/3
+        , set_chan_info/4
+        , get_chan_stats/3
+        , set_chan_stats/4
+        , discard_session/3
+        , kick_session/3
+        , get_chann_conn_mod/3
+        , lookup_by_clientid/3
+        ]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec lookup_by_clientid([node()], emqx_gateway_cm:gateway_name(), emqx_types:clientid()) ->
+          emqx_rpc:multicall_return([pid()]).
+lookup_by_clientid(Nodes, GwName, ClientId) ->
+    rpc:multicall(Nodes, emqx_gateway_cm, do_lookup_by_clientid, [GwName, ClientId]).
+
+-spec get_chan_info(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid())
+      -> emqx_types:infos() | undefined | {badrpc, _}.
+get_chan_info(GwName, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chan_info, [GwName, ClientId, ChanPid]).
+
+-spec set_chan_info(emqx_gateway_cm:gateway_name(),
+                    emqx_types:clientid(),
+                    pid(),
+                    emqx_types:infos()) -> boolean() | {badrpc, _}.
+set_chan_info(GwName, ClientId, ChanPid, Infos) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_set_chan_info, [GwName, ClientId, ChanPid, Infos]).
+
+-spec get_chan_stats(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid())
+      -> emqx_types:stats() | undefined | {badrpc, _}.
+get_chan_stats(GwName, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chan_stats, [GwName, ClientId, ChanPid]).
+
+-spec set_chan_stats(emqx_gateway_cm:gateway_name(),
+                     emqx_types:clientid(),
+                     pid(),
+                     emqx_types:stats()) -> boolean() | {badrpc, _}.
+set_chan_stats(GwName, ClientId, ChanPid, Stats) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_set_chan_stats, [GwName, ClientId, ChanPid, Stats]).
+
+-spec discard_session(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> _.
+discard_session(GwName, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_discard_session, [GwName, ClientId, ChanPid]).
+
+-spec kick_session(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> _.
+kick_session(GwName, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_kick_session, [GwName, ClientId, ChanPid]).
+
+-spec get_chann_conn_mod(emqx_gateway_cm:gateway_name(), emqx_types:clientid(), pid()) -> atom() | {badrpc, _}.
+get_chann_conn_mod(GwName, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_gateway_cm, do_get_chann_conn_mod, [GwName, ClientId, ChanPid]).

+ 2 - 1
apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl

@@ -125,7 +125,7 @@ t_get_set_chan_info_stats(_) ->
       #{clientinfo => clientinfo(), conninfo => conninfo()}, []),
 
     %% Info: get/set
-    NInfo = #{newinfo => true},
+    NInfo = #{newinfo => true, node => node()},
     emqx_gateway_cm:set_chan_info(?GWNAME, ?CLIENTID, NInfo),
     ?assertEqual(
        NInfo,
@@ -200,6 +200,7 @@ t_kick_session(_) ->
         100 ->
             ?assert(false, "waiting kick msg timeout")
     end,
+    ?assertMatch({error, not_found}, emqx_gateway_http:kickout_client(?GWNAME, <<"i-dont-exist">>)),
     meck:unload(emqx_gateway_cm_registry).
 
 t_unexpected_handle(Conf) ->

+ 1 - 0
apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl

@@ -1799,6 +1799,7 @@ t_clients_api(_) ->
     %% kickout
     {204, _} =
         request(delete, "/gateway/mqttsn/clients/client_id_test1"),
+    timer:sleep(100),
     {200, #{data := []}} = request(get, "/gateway/mqttsn/clients"),
 
     send_disconnect_msg(Socket, undefined),

+ 7 - 4
apps/emqx_management/src/emqx_mgmt.erl

@@ -235,7 +235,6 @@ nodes_info_count(PropList) ->
 %%--------------------------------------------------------------------
 
 lookup_client({clientid, ClientId}, FormatFun) ->
-
     lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun)
                   || Node <- mria_mnesia:running_nodes()]);
 
@@ -244,9 +243,13 @@ lookup_client({username, Username}, FormatFun) ->
                   || Node <- mria_mnesia:running_nodes()]).
 
 lookup_client(Node, Key, {M, F}) ->
-    case wrap_rpc(emqx_broker_proto_v1:lookup_client(Node, Key)) of
+    case wrap_rpc(emqx_cm_proto_v1:lookup_client(Node, Key)) of
         {error, Err} -> {error, Err};
-        L            -> lists:map(fun M:F/1, L)
+        L            -> lists:map(fun({Chan, Info0, Stats}) ->
+                                          Info = Info0#{node => Node},
+                                          M:F({Chan, Info, Stats})
+                                  end,
+                                  L)
     end.
 
 kickout_client({ClientID, FormatFun}) ->
@@ -259,7 +262,7 @@ kickout_client({ClientID, FormatFun}) ->
     end.
 
 kickout_client(Node, ClientId) ->
-    wrap_rpc(emqx_broker_proto_v1:kickout_client(Node, ClientId)).
+    wrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)).
 
 list_authz_cache(ClientId) ->
     call_client(ClientId, list_authz_cache).

+ 6 - 10
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -197,12 +197,15 @@ do_cluster_query([Node | Tail] = Nodes, Tab, Qs, QueryFun, Continuation,
 %% Do Query (or rpc query)
 %%--------------------------------------------------------------------
 
-%% @private
+%% @private This function is exempt from BPAPI
 do_query(Node, Tab, Qs, {M,F}, Continuation, Limit) when Node =:= node() ->
     erlang:apply(M, F, [Tab, Qs, Continuation, Limit]);
 do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) ->
-    rpc_call(Node, ?MODULE, do_query,
-             [Node, Tab, Qs, QueryFun, Continuation, Limit], 50000).
+    case rpc:call(Node, ?MODULE, do_query,
+                  [Node, Tab, Qs, QueryFun, Continuation, Limit], 50000) of
+        {badrpc, _} = R -> {error, R};
+        Ret -> Ret
+    end.
 
 sub_query_result(Len, Rows, Limit, Results, Meta) ->
     {Flag, NMeta} = judge_page_with_counting(Len, Meta),
@@ -219,13 +222,6 @@ sub_query_result(Len, Rows, Limit, Results, Meta) ->
         end,
     {NMeta, NResults}.
 
-%% @private
-rpc_call(Node, M, F, A, T) ->
-    case rpc:call(Node, M, F, A, T) of
-        {badrpc, _} = R -> {error, R};
-        Res -> Res
-    end.
-
 %%--------------------------------------------------------------------
 %% Table Select
 %%--------------------------------------------------------------------

+ 6 - 3
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -473,7 +473,7 @@ keepalive_api() ->
                 ],
             responses => #{
                 <<"404">> => emqx_mgmt_util:error_schema(<<"Client id not found">>),
-                <<"400">> => emqx_mgmt_util:error_schema(<<"">>, 'PARAMS_ERROR'),
+                <<"400">> => emqx_mgmt_util:error_schema(<<"">>, ['PARAMS_ERROR']),
                 <<"200">> => emqx_mgmt_util:schema(<<"ok">>)}}},
     {"/clients/:clientid/keepalive", Metadata, set_keepalive}.
 %%%==============================================================================================
@@ -732,13 +732,17 @@ run_fuzzy_filter(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr} |
 %% format funcs
 
 format_channel_info({_, ClientInfo, ClientStats}) ->
+    Node = case ClientInfo of
+               #{node := N} -> N;
+               _            -> node()
+           end,
     StatsMap = maps:without([memory, next_pkt_id, total_heap_size],
         maps:from_list(ClientStats)),
     ClientInfoMap0 = maps:fold(fun take_maps_from_inner/3, #{}, ClientInfo),
     {IpAddress, Port} = peername_dispart(maps:get(peername, ClientInfoMap0)),
     Connected      = maps:get(conn_state, ClientInfoMap0) =:= connected,
     ClientInfoMap1 = maps:merge(StatsMap, ClientInfoMap0),
-    ClientInfoMap2 = maps:put(node, node(), ClientInfoMap1),
+    ClientInfoMap2 = maps:put(node, Node, ClientInfoMap1),
     ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2),
     ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3),
     ClientInfoMap  = maps:put(connected, Connected, ClientInfoMap4),
@@ -801,4 +805,3 @@ format_authz_cache({{PubSub, Topic}, {AuthzResult, Timestamp}}) ->
        result => AuthzResult,
        updated_time => Timestamp
      }.
-

+ 2 - 2
apps/emqx_management/src/emqx_mgmt_api_configs.erl

@@ -158,12 +158,12 @@ configs(get, Params, _Req) ->
     case
         lists:member(Node, mria_mnesia:running_nodes())
         andalso
-        rpc:call(Node, ?MODULE, get_full_config, [])
+        emqx_management_proto_v1:get_full_config(Node)
     of
         false ->
             Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])),
             {500, #{code => 'BAD_NODE', message => Message}};
-        {error, {badrpc, R}} ->
+        {badrpc, R} ->
             Message = list_to_binary(io_lib:format("Bad node ~p, reason ~p", [Node, R])),
             {500, #{code => 'BAD_NODE', message => Message}};
         Res ->

+ 19 - 3
apps/emqx_management/src/emqx_mgmt_api_trace.erl

@@ -261,7 +261,7 @@ trace(get, _Params) ->
     end;
 trace(post, #{body := Param}) ->
     case emqx_trace:create(Param) of
-        ok -> {200};
+        {ok, Trace0} -> {200, format_trace(Trace0)};
         {error, {already_existed, Name}} ->
             {400, #{
                 code => 'ALREADY_EXISTED',
@@ -280,11 +280,27 @@ trace(post, #{body := Param}) ->
     end;
 trace(delete, _Param) ->
     ok = emqx_trace:clear(),
-    {200}.
+    {204}.
+
+format_trace(Trace0) ->
+    [
+        #{start_at := Start, end_at := End,
+            enable := Enable, type := Type, filter := Filter} = Trace1
+    ] = emqx_trace:format([Trace0]),
+    Now = erlang:system_time(second),
+    LogSize = lists:foldl(fun(Node, Acc) -> Acc#{Node => 0} end, #{},
+        mria_mnesia:running_nodes()),
+    Trace2 = maps:without([enable, filter], Trace1),
+    Trace2#{log_size => LogSize
+        , Type => iolist_to_binary(Filter)
+        , start_at => list_to_binary(calendar:system_time_to_rfc3339(Start))
+        , end_at => list_to_binary(calendar:system_time_to_rfc3339(End))
+        , status => status(Enable, Start, End, Now)
+    }.
 
 delete_trace(delete, #{bindings := #{name := Name}}) ->
     case emqx_trace:delete(Name) of
-        ok -> {200};
+        ok -> {204};
         {error, not_found} -> ?NOT_FOUND(Name)
     end.
 

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -490,7 +490,7 @@ trace_cluster_on(Name, Type, Filter, DurationS0) ->
              , end_at => list_to_binary(calendar:system_time_to_rfc3339(Now + DurationS))
              },
     case emqx_trace:create(Trace) of
-        ok ->
+        {ok, _} ->
             emqx_ctl:print("cluster_trace ~p ~s ~s successfully~n", [Type, Filter, Name]);
         {error, Error} ->
             emqx_ctl:print("[error] cluster_trace ~s ~s=~s ~p~n",

+ 6 - 0
apps/emqx_management/src/proto/emqx_management_proto_v1.erl

@@ -32,6 +32,8 @@
         , unsubscribe/3
 
         , call_client/3
+
+        , get_full_config/1
         ]).
 
 -include_lib("emqx/include/bpapi.hrl").
@@ -77,3 +79,7 @@ unsubscribe(Node, ClientId, Topic) ->
 -spec call_client(node(), emqx_types:clientid(), term()) -> term().
 call_client(Node, ClientId, Req) ->
     rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]).
+
+-spec get_full_config(node()) -> map() | list() | {badrpc, _}.
+get_full_config(Node) ->
+    rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []).

+ 4 - 4
apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl

@@ -69,7 +69,7 @@ t_http_test(_Config) ->
     ],
 
     {ok, Create} = request_api(post, api_path("trace"), Header, Trace),
-    ?assertEqual(<<>>, Create),
+    ?assertMatch(#{<<"name">> := Name}, json(Create)),
 
     {ok, List} = request_api(get, api_path("trace"), Header),
     [Data] = json(List),
@@ -107,7 +107,7 @@ t_http_test(_Config) ->
 
     %% clear
     {ok, Create1} = request_api(post, api_path("trace"), Header, Trace),
-    ?assertEqual(<<>>, Create1),
+    ?assertMatch(#{<<"name">> := Name}, json(Create1)),
 
     {ok, Clear} = request_api(delete, api_path("trace"), Header),
     ?assertEqual(<<>>, Clear),
@@ -141,7 +141,7 @@ create_trace(Name, ClientId, Start) ->
     ?check_trace(
         #{timetrap => 900},
         begin
-            ok = emqx_trace:create([{<<"name">>, Name},
+            {ok, _} = emqx_trace:create([{<<"name">>, Name},
                 {<<"type">>, clientid}, {<<"clientid">>, ClientId}, {<<"start_at">>, Start}]),
             ?block_until(#{?snk_kind := update_trace_done})
         end,
@@ -206,7 +206,7 @@ do_request_api(Method, Request) ->
         {error, {shutdown, server_closed}} ->
             {error, server_closed};
         {ok, {{"HTTP/1.1", Code, _}, _Headers, Return}}
-            when Code =:= 200 orelse Code =:= 201 ->
+            when Code =:= 200 orelse Code =:= 201 orelse Code =:= 204 ->
             {ok, Return};
         {ok, {Reason, _Header, Body}} ->
             {error, Reason, Body}

+ 19 - 0
apps/emqx_modules/include/emqx_modules.hrl

@@ -1,5 +1,24 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
 %% The destination URL for the telemetry data report
 -define(TELEMETRY_URL, "https://telemetry.emqx.io/api/telemetry").
 
 %% Interval for reporting telemetry data, Default: 7d
 -define(REPORT_INTERVAR, 604800).
+
+-define(API_TAG_MQTT, [<<"mqtt">>]).
+-define(API_SCHEMA_MODULE, emqx_modules_schema).

+ 6 - 5
apps/emqx_modules/src/emqx_delayed_api.erl

@@ -19,6 +19,7 @@
 -behaviour(minirest_api).
 
 -include_lib("typerefl/include/types.hrl").
+-include("emqx_modules.hrl").
 
 -import(hoconsc, [mk/2, ref/1, ref/2]).
 
@@ -62,7 +63,7 @@ schema("/mqtt/delayed") ->
     #{
         'operationId' => status,
         get => #{
-            tags => [<<"mqtt">>],
+            tags => ?API_TAG_MQTT,
             description => <<"Get delayed status">>,
             summary => <<"Get delayed status">>,
             responses => #{
@@ -70,7 +71,7 @@ schema("/mqtt/delayed") ->
             }
         },
         put => #{
-            tags => [<<"mqtt">>],
+            tags => ?API_TAG_MQTT,
             description => <<"Enable or disable delayed, set max delayed messages">>,
             'requestBody' => ref(emqx_modules_schema, "delayed"),
             responses => #{
@@ -85,7 +86,7 @@ schema("/mqtt/delayed") ->
 schema("/mqtt/delayed/messages/:msgid") ->
     #{'operationId' => delayed_message,
         get => #{
-            tags => [<<"mqtt">>],
+            tags => ?API_TAG_MQTT,
             description => <<"Get delayed message">>,
             parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
             responses => #{
@@ -97,7 +98,7 @@ schema("/mqtt/delayed/messages/:msgid") ->
             }
         },
         delete => #{
-            tags => [<<"mqtt">>],
+            tags => ?API_TAG_MQTT,
             description => <<"Delete delayed message">>,
             parameters => [{msgid, mk(binary(), #{in => path, desc => <<"delay message ID">>})}],
             responses => #{
@@ -113,7 +114,7 @@ schema("/mqtt/delayed/messages") ->
     #{
         'operationId' => delayed_messages,
         get => #{
-            tags => [<<"mqtt">>],
+            tags => ?API_TAG_MQTT,
             description => <<"List delayed messages">>,
             parameters => [ref(emqx_dashboard_swagger, page), ref(emqx_dashboard_swagger, limit)],
             responses => #{

+ 29 - 25
apps/emqx_modules/src/emqx_event_message_api.erl

@@ -17,37 +17,41 @@
 
 -behaviour(minirest_api).
 
--export([api_spec/0]).
+-import(hoconsc, [mk/2, ref/2]).
+-include("emqx_modules.hrl").
 
--export([event_message/2]).
+-export([ api_spec/0
+        , paths/0
+        , schema/1
+        ]).
 
--import(emqx_mgmt_util, [ schema/1
-                        ]).
+-export([event_message/2]).
 
 api_spec() ->
-    {[event_message_api()], []}.
-
-conf_schema() ->
-    emqx_mgmt_api_configs:gen_schema(emqx:get_config([event_message])).
-
-event_message_api() ->
-    Path = "/mqtt/event_message",
-    Metadata = #{
-        get => #{
-            description => <<"Event Message">>,
-            responses => #{
-                <<"200">> => schema(conf_schema())
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    ["/mqtt/event_message"].
+
+schema("/mqtt/event_message") ->
+    #{ 'operationId' => event_message
+     , get =>
+           #{ description => <<"Event Message">>
+            , tags => ?API_TAG_MQTT
+            , responses =>
+                  #{200 => status_schema(<<"Get Event Message config successfully">>)}
             }
-        },
-        put => #{
-            description => <<"Update Event Message">>,
-            'requestBody' => schema(conf_schema()),
-            responses => #{
-                <<"200">> => schema(conf_schema())
+     , put =>
+           #{ description => <<"Update Event Message">>
+            , tags => ?API_TAG_MQTT
+            , 'requestBody' => status_schema(<<"Update Event Message config">>)
+            , responses =>
+                  #{200 => status_schema(<<"Update Event Message config successfully">>)}
             }
-        }
-    },
-    {Path, Metadata, event_message}.
+     }.
+
+status_schema(Desc) ->
+    mk(ref(?API_SCHEMA_MODULE, "event_message"), #{in => body, desc => Desc}).
 
 event_message(get, _Params) ->
     {200, emqx_event_message:list()};

+ 31 - 11
apps/emqx_modules/src/emqx_modules_schema.erl

@@ -43,21 +43,41 @@ fields("delayed") ->
     ];
 
 fields("rewrite") ->
-    [ {action, sc(hoconsc:enum([subscribe, publish, all]), #{desc => "Action", example => publish})}
-    , {source_topic, sc(binary(), #{desc => "Origin Topic", example => "x/#"})}
-    , {dest_topic, sc(binary(), #{desc => "Destination Topic", example => "z/y/$1"})}
-    , {re, fun regular_expression/1 }
+    [ { action
+      , sc( hoconsc:enum([subscribe, publish, all])
+          , #{desc => <<"Action">>, example => publish})}
+    , { source_topic
+      , sc( binary()
+          , #{desc => <<"Origin Topic">>, example => "x/#"})}
+    , { dest_topic
+      , sc( binary()
+          , #{desc => <<"Destination Topic">>, example => "z/y/$1"})}
+    , { re, fun regular_expression/1 }
     ];
 
 
 fields("event_message") ->
-    [ {"$event/client_connected", sc(boolean(), #{default => false})}
-    , {"$event/client_disconnected", sc(boolean(), #{default => false})}
-    , {"$event/client_subscribed", sc(boolean(), #{default => false})}
-    , {"$event/client_unsubscribed", sc(boolean(), #{default => false})}
-    , {"$event/message_delivered", sc(boolean(), #{default => false})}
-    , {"$event/message_acked", sc(boolean(), #{default => false})}
-    , {"$event/message_dropped", sc(boolean(), #{default => false})}
+    [ { '$event/client_connected'
+      , sc( boolean()
+          , #{desc => <<"Client connected to EMQ X event">>, default => false})}
+    , { '$event/client_disconnected'
+      , sc(boolean()
+          , #{desc => <<"Client disconnected to EMQ X event">>, default => false})}
+    , { '$event/client_subscribed'
+      , sc( boolean()
+          , #{desc => <<"Client subscribe topic event">>, default => false})}
+    , { '$event/client_unsubscribed'
+      , sc( boolean()
+          , #{desc => <<"Client unsubscribe topic event">>, default => false})}
+    , { '$event/message_delivered'
+      , sc( boolean()
+          , #{desc => <<"Message delivered event">>, default => false})}
+    , { '$event/message_acked'
+      , sc( boolean()
+          , #{desc => <<"Message acked event">>, default => false})}
+    , { '$event/message_dropped'
+      , sc( boolean()
+          , #{desc => <<"Message dropped event">>, default => false})}
     ];
 
 fields("topic_metrics") ->

+ 3 - 1
apps/emqx_modules/src/emqx_rewrite_api.erl

@@ -17,6 +17,7 @@
 
 -behaviour(minirest_api).
 -include_lib("typerefl/include/types.hrl").
+-include("emqx_modules.hrl").
 
 -export([api_spec/0, paths/0, schema/1]).
 
@@ -42,7 +43,7 @@ schema("/mqtt/topic_rewrite") ->
     #{
         operationId => topic_rewrite,
         get => #{
-            tags => [mqtt],
+            tags => ?API_TAG_MQTT,
             description => <<"List rewrite topic.">>,
             responses => #{
                 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")),
@@ -51,6 +52,7 @@ schema("/mqtt/topic_rewrite") ->
         },
         put => #{
             description => <<"Update rewrite topic">>,
+            tags => ?API_TAG_MQTT,
             requestBody => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")),#{}),
             responses => #{
                 200 => hoconsc:mk(hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")),

+ 2 - 2
apps/emqx_modules/src/emqx_telemetry_api.erl

@@ -71,13 +71,13 @@ schema("/telemetry/data") ->
      }.
 
 status_schema(Desc) ->
-    mk(ref(?MODULE, status), #{desc => Desc}).
+    mk(ref(?MODULE, status), #{in => body, desc => Desc}).
 
 fields(status) ->
     [ { enable
       , mk( boolean()
           , #{ desc => <<"Telemetry status">>
-             , default => false
+             , default => true
              , example => false
              })
       }

+ 223 - 117
apps/emqx_modules/src/emqx_topic_metrics_api.erl

@@ -18,163 +18,258 @@
 
 -behaviour(minirest_api).
 
--import(emqx_mgmt_util, [ properties/1
-                        , schema/1
-                        , object_schema/1
-                        , object_schema/2
-                        , object_array_schema/2
-                        , error_schema/2
-                        ]).
+-include_lib("typerefl/include/types.hrl").
+-include("emqx_modules.hrl").
 
--export([api_spec/0]).
+-import( hoconsc
+       , [ mk/2
+         , ref/1
+         , ref/2
+         , array/1
+         , map/2]).
 
 -export([ topic_metrics/2
         , operate_topic_metrics/2
         ]).
 
--define(ERROR_TOPIC, 'ERROR_TOPIC').
+-export([ cluster_accumulation_metrics/0
+        , cluster_accumulation_metrics/1]).
 
--define(EXCEED_LIMIT, 'EXCEED_LIMIT').
+-export([ api_spec/0
+        , paths/0
+        , schema/1
+        , fields/1
+        ]).
 
+-define(ERROR_TOPIC, 'ERROR_TOPIC').
+-define(EXCEED_LIMIT, 'EXCEED_LIMIT').
 -define(BAD_TOPIC, 'BAD_TOPIC').
-
+-define(BAD_RPC, 'BAD_RPC').
 -define(BAD_REQUEST, 'BAD_REQUEST').
 
 api_spec() ->
-    {[
-        topic_metrics_api(),
-        operation_topic_metrics_api()
-    ],[]}.
-
-properties() ->
-    properties([
-        {topic, string},
-        {create_time, string, <<"Date time, rfc3339">>},
-        {reset_time, string, <<"Nullable. Date time, rfc3339.">>},
-        {metrics, object, [{'messages.dropped.count', integer},
-                           {'messages.dropped.rate', number},
-                           {'messages.in.count', integer},
-                           {'messages.in.rate', number},
-                           {'messages.out.count', integer},
-                           {'messages.out.rate', number},
-                           {'messages.qos0.in.count', integer},
-                           {'messages.qos0.in.rate', number},
-                           {'messages.qos0.out.count', integer},
-                           {'messages.qos0.out.rate', number},
-                           {'messages.qos1.in.count', integer},
-                           {'messages.qos1.in.rate', number},
-                           {'messages.qos1.out.count', integer},
-                           {'messages.qos1.out.rate', number},
-                           {'messages.qos2.in.count', integer},
-                           {'messages.qos2.in.rate', number},
-                           {'messages.qos2.out.count', integer},
-                           {'messages.qos2.out.rate', number}]}
-    ]).
-
-topic_metrics_api() ->
-    MetaData = #{
-        %% Get all nodes metrics and accumulate all of these
-        get => #{
-            description => <<"List topic metrics">>,
-            responses => #{
-                <<"200">> => object_array_schema(properties(), <<"List topic metrics">>)
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    [ "/mqtt/topic_metrics"
+    , "/mqtt/topic_metrics/:topic"
+    ].
+
+
+schema("/mqtt/topic_metrics") ->
+    #{ 'operationId' => topic_metrics
+     , get =>
+           #{ description => <<"List topic metrics">>
+            , tags => ?API_TAG_MQTT
+            , responses  =>
+                  #{200  => mk(array(hoconsc:ref(topic_metrics)), #{ desc => <<"List all topic metrics">>})}
             }
-        },
-        put => #{
-            description => <<"Reset topic metrics by topic name, or all">>,
-            'requestBody' => object_schema(properties([
-                {topic, string, <<"no topic will reset all">>},
-                {action, string, <<"Action, default reset">>, [reset]}
-            ])),
-            responses => #{
-                <<"200">> => schema(<<"Reset topic metrics success">>),
-                <<"404">> => error_schema(<<"Topic not found">>, [?ERROR_TOPIC])
+     , put =>
+           #{ description => <<"Reset topic metrics by topic name. Or reset all Topic Metrics">>
+            , tags => ?API_TAG_MQTT
+            , 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
+                                 ref(reset),
+                                 reset_examples())
+            , responses =>
+                  #{ 204 => <<"Reset topic metrics successfully">>
+                   , 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>)
+                   }
             }
-        },
-        post => #{
-            description => <<"Create topic metrics">>,
-            'requestBody' => object_schema(properties([{topic, string}])),
-            responses => #{
-                <<"200">> => schema(<<"Create topic metrics success">>),
-                <<"409">> => error_schema(<<"Topic metrics max limit">>, [?EXCEED_LIMIT]),
-                <<"400">> => error_schema( <<"Topic metrics already exist or bad topic">>
-                                         , [?BAD_REQUEST, ?BAD_TOPIC])
+     , post =>
+           #{ description => <<"Create topic metrics">>
+            , tags => ?API_TAG_MQTT
+            , 'requestBody' => [topic(body)]
+            , responses =>
+                  #{ 204 => <<"Create topic metrics success">>
+                   , 409 => emqx_dashboard_swagger:error_codes([?EXCEED_LIMIT], <<"Topic metrics exceeded max limit 512">>)
+                   , 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST, ?BAD_TOPIC], <<"Topic metrics already existed or bad topic">>)
+                   }
             }
-        }
-    },
-    {"/mqtt/topic_metrics", MetaData, topic_metrics}.
-
-operation_topic_metrics_api() ->
-    MetaData = #{
-        get => #{
-            description => <<"Get topic metrics">>,
-            parameters => [topic_param()],
-            responses => #{
-                <<"200">> => object_schema(properties(), <<"Topic metrics">>),
-                <<"404">> => error_schema(<<"Topic not found">>, [?ERROR_TOPIC])
-            }},
-        delete => #{
-            description => <<"Deregister topic metrics">>,
-            parameters => [topic_param()],
-            responses => #{
-                <<"204">> => schema(<<"Deregister topic metrics">>),
-                <<"404">> => error_schema(<<"Topic not found">>, [?ERROR_TOPIC])
+     };
+schema("/mqtt/topic_metrics/:topic") ->
+    #{ 'operationId' =>  operate_topic_metrics
+     , get =>
+           #{ description => <<"Get topic metrics">>
+            , tags => ?API_TAG_MQTT
+            , parameters => [topic(path)]
+            , responses =>
+                  #{ 200 => mk(ref(topic_metrics), #{ desc => <<"Topic metrics">> })
+                   , 404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>)
+                   }
             }
-        }
-    },
-    {"/mqtt/topic_metrics/:topic", MetaData, operate_topic_metrics}.
-
-topic_param() ->
-    #{
-        name => topic,
-        in => path,
-        required => true,
-        description => <<"Notice: Topic string url must encode">>,
-        schema => #{type => string}
+     , delete =>
+           #{ description => <<"Remove the topic metrics">>
+            , tags => ?API_TAG_MQTT
+            , parameters => [topic(path)]
+            , responses =>
+                  #{ 204 => <<"Removed topic metrics successfully">>,
+                     404 => emqx_dashboard_swagger:error_codes([?ERROR_TOPIC], <<"Topic not found">>)
+                   }
+            }
+     }.
+
+fields(reset) ->
+    [ {topic
+      , mk( binary()
+          , #{ desc => <<"Topic Name. If this paramter is not present, all created topic metrics will be reseted">>
+             , example => <<"testtopic/1">>
+             , nullable => true})}
+    , {action
+      , mk( string()
+          , #{ desc => <<"Action Name. Only as a \"reset\"">>
+             , enum => [reset]
+             , nullable => false
+             , example => <<"reset">>})}
+    ];
+
+fields(topic_metrics) ->
+    [ { topic
+      , mk( binary()
+          , #{ desc => <<"Topic Name">>
+             , example => <<"testtopic/1">>
+             , nullable => false})},
+      { create_time
+      , mk( emqx_schema:rfc3339_system_time()
+          , #{ desc => <<"Topic Metrics created date time, in rfc3339">>
+             , nullable => false
+             , example => <<"2022-01-14T21:48:47+08:00">>})},
+      { reset_time
+      , mk( emqx_schema:rfc3339_system_time()
+          , #{ desc => <<"Topic Metrics reset date time, in rfc3339. Nullable if never reseted">>
+             , nullable => true
+             , example => <<"2022-01-14T21:48:47+08:00">>})},
+      { metrics
+      , mk( ref(metrics)
+          , #{ desc => <<"Topic Metrics fields">>
+             , nullable => false})
+      }
+    ];
+
+fields(metrics) ->
+    [ { 'messages.dropped.count'
+      , mk( integer(), #{ desc => <<"Message dropped count">>
+                        , example => 0})},
+      { 'messages.dropped.rate'
+      , mk( number(),  #{ desc => <<"Message dropped rate in 5s">>
+                        , example => 0})},
+      { 'messages.in.count'
+      , mk( integer(), #{ desc => <<"Message received count">>
+                        , example => 0})},
+      { 'messages.in.rate'
+      , mk( number(),  #{ desc => <<"Message received rate in 5s">>
+                        , example => 0})},
+      { 'messages.out.count'
+      , mk( integer(), #{ desc => <<"Message sent count">>
+                        , example => 0})},
+      { 'messages.out.rate'
+      , mk( number(),  #{ desc => <<"Message sent rate in 5s">>
+                        , example => 0})},
+      { 'messages.qos0.in.count'
+      , mk( integer(), #{ desc => <<"Message with QoS 0 received count">>
+                        , example => 0})},
+      { 'messages.qos0.in.rate'
+      , mk( number(),  #{ desc => <<"Message with QoS 0 received rate in 5s">>
+                        , example => 0})},
+      { 'messages.qos0.out.count'
+      , mk( integer(), #{ desc => <<"Message with QoS 0 sent count">>
+                        , example => 0})},
+      { 'messages.qos0.out.rate'
+      , mk( number(),  #{ desc => <<"Message with QoS 0 sent rate in 5s">>
+                        , example => 0})},
+      { 'messages.qos1.in.count'
+      , mk( integer(), #{ desc => <<"Message with QoS 1 received count">>
+                        , example => 0})},
+      { 'messages.qos1.in.rate'
+      , mk( number(),  #{ desc => <<"Message with QoS 1 received rate in 5s">>
+                        , example => 0})},
+      { 'messages.qos1.out.count'
+      , mk( integer(), #{ desc => <<"Message with QoS 1 sent count">>
+                        , example => 0})},
+      { 'messages.qos1.out.rate'
+      , mk( number(),  #{ desc => <<"Message with QoS 1 sent rate in 5s">>
+                        , example => 0})},
+      { 'messages.qos2.in.count'
+      , mk( integer(), #{ desc => <<"Message with QoS 2 sent count">>
+                        , example => 0})},
+      { 'messages.qos2.in.rate'
+      , mk( number(),  #{ desc => <<"Message with QoS 2 received rate in 5s">>
+                        , example => 0})},
+      { 'messages.qos2.out.count'
+      , mk( integer(), #{ desc => <<"Message with QoS 2 sent count">>
+                        , example => 0})},
+      { 'messages.qos2.out.rate'
+      , mk( number(),  #{ desc => <<"Message with QoS 2 sent rate in 5s">>
+                        , example => 0})}
+    ].
+
+topic(In) ->
+    case In of
+        body ->
+            Desc = <<"Raw topic string">>,
+            Example = "testtopic/1";
+        path ->
+            Desc = <<"Notice: Topic string in url path must be encoded">>,
+            Example = "testtopic%2F1"
+    end,
+    { topic
+    , mk( binary(),
+          #{ desc => Desc
+           , required => true
+           , in => In
+           , example => Example
+           })
     }.
 
+reset_examples() ->
+    #{ reset_specific_one_topic_metrics =>
+           #{ summary => <<"reset_specific_one_topic_metrics">>
+            , value =>
+                  #{ topic  => "testtopic/1"
+                   , action => "reset"
+                   }
+            }
+     , reset_all_topic_metrics =>
+           #{ summary => <<"reset_all_topic_metrics">>
+            , value =>
+                  #{ action => "reset"
+                   }
+            }
+     }.
+
 %%--------------------------------------------------------------------
 %% HTTP Callbacks
 %%--------------------------------------------------------------------
 
 topic_metrics(get, _) ->
-    case cluster_accumulation_metrics() of
-        {error, Reason} ->
-            {500, Reason};
-        {ok, Metrics} ->
-            {200, Metrics}
-    end;
+    get_cluster_response([]);
 
 topic_metrics(put, #{body := #{<<"topic">> := Topic, <<"action">> := <<"reset">>}}) ->
     case reset(Topic) of
-        ok -> {200};
-        {error, Reason} -> reason2httpresp(Reason)
+        ok ->
+            get_cluster_response([Topic]);
+        {error, Reason} ->
+            reason2httpresp(Reason)
     end;
 topic_metrics(put, #{body := #{<<"action">> := <<"reset">>}}) ->
     reset(),
-    {200};
+    get_cluster_response([]);
 
 topic_metrics(post, #{body := #{<<"topic">> := <<>>}}) ->
     {400, 'BAD_REQUEST', <<"Topic can not be empty">>};
 topic_metrics(post, #{body := #{<<"topic">> := Topic}}) ->
     case emqx_modules_conf:add_topic_metrics(Topic) of
         {ok, Topic} ->
-            {200};
+            get_cluster_response([Topic]);
         {error, Reason} ->
             reason2httpresp(Reason)
     end.
 
 operate_topic_metrics(get, #{bindings := #{topic := Topic0}}) ->
-    case cluster_accumulation_metrics(emqx_http_lib:uri_decode(Topic0)) of
-        {ok, Metrics} ->
-            {200, Metrics};
-        {error, Reason} ->
-            reason2httpresp(Reason)
-    end;
+    get_cluster_response([emqx_http_lib:uri_decode(Topic0)]);
 
 operate_topic_metrics(delete, #{bindings := #{topic := Topic0}}) ->
     case emqx_modules_conf:remove_topic_metrics(emqx_http_lib:uri_decode(Topic0)) of
-        ok -> {200};
+        ok -> {204};
         {error, Reason} -> reason2httpresp(Reason)
     end.
 
@@ -197,7 +292,8 @@ cluster_accumulation_metrics(Topic) ->
         {SuccResList, []} ->
             case lists:filter(fun({error, _}) -> false; (_) -> true
                               end, SuccResList) of
-                [] -> {error, topic_not_found};
+                [] ->
+                    {error, topic_not_found};
                 TopicMetrics ->
                     NTopicMetrics = [ [T] || T <- TopicMetrics],
                     [AccMetrics] = accumulate_nodes_metrics(NTopicMetrics),
@@ -277,8 +373,8 @@ reason2httpresp(bad_topic) ->
 reason2httpresp({quota_exceeded, bad_topic}) ->
     Msg = list_to_binary(
             io_lib:format(
-                "Max topic metrics count is ~p, and topic cannot have wildcard",
-                [emqx_topic_metrics:max_limit()])),
+              "Max topic metrics count is ~p, and topic cannot have wildcard",
+              [emqx_topic_metrics:max_limit()])),
     {400, #{code => ?BAD_REQUEST, message => Msg}};
 reason2httpresp(already_existed) ->
     Msg = <<"Topic already registered">>,
@@ -289,3 +385,13 @@ reason2httpresp(topic_not_found) ->
 reason2httpresp(not_found) ->
     Msg = <<"Topic not found">>,
     {404, #{code => ?ERROR_TOPIC, message => Msg}}.
+
+get_cluster_response(Args) ->
+    case erlang:apply(?MODULE, cluster_accumulation_metrics, Args) of
+        {error, {badrpc, RPCReason}} ->
+            {500, RPCReason};
+        {error, Reason} when is_atom(Reason) ->
+            reason2httpresp(Reason);
+        {ok, Metrics} ->
+            {200, Metrics}
+    end.

+ 1 - 1
apps/emqx_prometheus/rebar.config

@@ -5,7 +5,7 @@
    %% FIXME: tag this as v3.1.3
    {prometheus, {git, "https://github.com/emqx/prometheus.erl", {ref, "9994c76adca40d91a2545102230ccce2423fd8a7"}}},
    {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.2"}}},
-   {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.9"}}}
+   {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.10"}}}
  ]}.
 
 {edoc_opts, [{preprocess, true}]}.

+ 73 - 36
apps/emqx_prometheus/src/emqx_prometheus_api.erl

@@ -20,48 +20,56 @@
 
 -include("emqx_prometheus.hrl").
 
--import(emqx_mgmt_util, [ schema/1]).
+-import(hoconsc, [ref/2]).
 
--export([api_spec/0]).
+-export([ api_spec/0
+        , paths/0
+        , schema/1
+        ]).
 
 -export([ prometheus/2
         , stats/2
         ]).
 
+-define(API_TAG_PROMETHEUS, [<<"premetheus">>]).
+-define(SCHEMA_MODULE, emqx_prometheus_schema).
+
+
 api_spec() ->
-    {[prometheus_api(), prometheus_data_api()], []}.
-
-conf_schema() ->
-    emqx_mgmt_api_configs:gen_schema(emqx:get_raw_config([prometheus])).
-
-prometheus_api() ->
-    Metadata = #{
-        get => #{
-            description => <<"Get Prometheus info">>,
-            responses => #{<<"200">> => schema(conf_schema())}
-        },
-        put => #{
-            description => <<"Update Prometheus">>,
-            'requestBody' => schema(conf_schema()),
-            responses => #{<<"200">> => schema(conf_schema())}
-        }
-    },
-    {"/prometheus", Metadata, prometheus}.
-
-prometheus_data_api() ->
-    Metadata = #{
-        get => #{
-            description => <<"Get Prometheus Data">>,
-            responses => #{<<"200">> =>
-                #{content =>
-                #{
-                    'application/json' => #{schema => #{type => object}},
-                    'text/plain' => #{schema => #{type => string}}
-                }}
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    [ "/prometheus"
+    , "/prometheus/stats"
+    ].
+
+schema("/prometheus") ->
+    #{ 'operationId' => prometheus
+     , get =>
+           #{ description => <<"Get Prometheus config info">>
+            , tags => ?API_TAG_PROMETHEUS
+            , responses =>
+                  #{200 => prometheus_config_schema()}
             }
-        }
-    },
-    {"/prometheus/stats", Metadata, stats}.
+     , put =>
+           #{ description => <<"Update Prometheus config">>
+            , 'requestBody' => prometheus_config_schema()
+            , responses =>
+                  #{200 => prometheus_config_schema()}
+            }
+     };
+schema("/prometheus/stats") ->
+    #{ 'operationId' => stats
+     , get =>
+           #{ description => <<"Get Prometheus Data">>
+            , responses =>
+                  #{200 => prometheus_data_schema()}
+            }
+     }.
+
+%%--------------------------------------------------------------------
+%% API Handler funcs
+%%--------------------------------------------------------------------
 
 prometheus(get, _Params) ->
     {200, emqx:get_raw_config([<<"prometheus">>], #{})};
@@ -83,6 +91,35 @@ stats(get, #{headers := Headers}) ->
         end,
     Data = emqx_prometheus:collect(Type),
     case Type of
-        <<"json">> -> {200, Data};
-        <<"prometheus">> -> {200, #{<<"content-type">> => <<"text/plain">>}, Data}
+        <<"json">> ->
+            {200, Data};
+        <<"prometheus">> ->
+            {200, #{<<"content-type">> => <<"text/plain">>}, Data}
     end.
+
+%%--------------------------------------------------------------------
+%% Internal funcs
+%%--------------------------------------------------------------------
+
+prometheus_config_schema() ->
+    emqx_dashboard_swagger:schema_with_example(
+      ref(?SCHEMA_MODULE, "prometheus"),
+      prometheus_config_example()).
+
+prometheus_config_example() ->
+    #{ enable => true
+     , interval => "15s"
+     , push_gateway_server => <<"http://127.0.0.1:9091">>
+     }.
+
+prometheus_data_schema() ->
+    #{ description => <<"Get Prometheus Data">>
+     , content =>
+           #{ 'application/json' =>
+                  #{ schema => #{type => object}
+                   , description => <<"Prometheus Data in json">>}
+            , 'text/plain' =>
+                  #{ schema => #{type => string}
+                   , description => <<"Prometheus Data in text/plain">>}
+            }
+     }.

+ 3 - 3
apps/emqx_prometheus/src/emqx_prometheus_schema.erl

@@ -28,9 +28,9 @@ namespace() -> "prometheus".
 roots() -> ["prometheus"].
 
 fields("prometheus") ->
-    [ {push_gateway_server, sc(string(), #{})}
-    , {interval, sc(emqx_schema:duration_ms(), #{default => "15s"})}
-    , {enable, sc(boolean(), #{default => false})}
+    [ {push_gateway_server, sc(string(), #{default => "http://127.0.0.1:9091", nullabel => false})}
+    , {interval, sc(emqx_schema:duration_ms(), #{default => "15s", nullabel => false})}
+    , {enable, sc(boolean(), #{default => false, nullabel => false})}
     ].
 
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).

+ 1 - 0
apps/emqx_resource/rebar.config

@@ -15,4 +15,5 @@
            ]}.
 
 {deps, [ {jsx, {git, "https://github.com/talentdeficit/jsx", {tag, "v3.1.0"}}}
+       , {emqx, {path, "../emqx"}}
        ]}.

+ 6 - 6
apps/emqx_resource/src/emqx_resource.erl

@@ -147,7 +147,7 @@ create(InstId, ResourceType, Config) ->
 -spec create(instance_id(), resource_type(), resource_config(), create_opts()) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
 create(InstId, ResourceType, Config, Opts) ->
-    cluster_call(create_local, [InstId, ResourceType, Config, Opts]).
+    wrap_rpc(emqx_resource_proto_v1:create(InstId, ResourceType, Config, Opts)).
 
 -spec create_local(instance_id(), resource_type(), resource_config()) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
@@ -162,7 +162,7 @@ create_local(InstId, ResourceType, Config, Opts) ->
 -spec create_dry_run(resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
 create_dry_run(ResourceType, Config) ->
-    cluster_call(create_dry_run_local, [ResourceType, Config]).
+    wrap_rpc(emqx_resource_proto_v1:create_dry_run(ResourceType, Config)).
 
 -spec create_dry_run_local(resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
@@ -172,7 +172,7 @@ create_dry_run_local(ResourceType, Config) ->
 -spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
 recreate(InstId, ResourceType, Config, Opts) ->
-    cluster_call(recreate_local, [InstId, ResourceType, Config, Opts]).
+    wrap_rpc(emqx_resource_proto_v1:recreate(InstId, ResourceType, Config, Opts)).
 
 -spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
@@ -181,7 +181,7 @@ recreate_local(InstId, ResourceType, Config, Opts) ->
 
 -spec remove(instance_id()) -> ok | {error, Reason :: term()}.
 remove(InstId) ->
-    cluster_call(remove_local, [InstId]).
+    wrap_rpc(emqx_resource_proto_v1:remove(InstId)).
 
 -spec remove_local(instance_id()) -> ok | {error, Reason :: term()}.
 remove_local(InstId) ->
@@ -366,8 +366,8 @@ call_instance(InstId, Query) ->
 safe_apply(Func, Args) ->
     ?SAFE_CALL(erlang:apply(Func, Args)).
 
-cluster_call(Func, Args) ->
-    case emqx_cluster_rpc:multicall(?MODULE, Func, Args) of
+wrap_rpc(Ret) ->
+    case Ret of
         {ok, _TxnId, Result} -> Result;
         Failed -> Failed
     end.

+ 5 - 1
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -192,7 +192,11 @@ do_create_dry_run(ResourceType, Config) ->
     case emqx_resource:call_start(InstId, ResourceType, Config) of
         {ok, ResourceState} ->
             case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of
-                {ok, _} -> ok;
+                {ok, _} ->
+                    case emqx_resource:call_stop(InstId, ResourceType, ResourceState) of
+                        {error, _} = Error -> Error;
+                        _ -> ok
+                    end;
                 {error, Reason, _} -> {error, Reason}
             end;
         {error, Reason} ->

+ 62 - 0
apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl

@@ -0,0 +1,62 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+
+        , create/4
+        , create_dry_run/2
+        , recreate/4
+        , remove/1
+        ]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec create( emqx_resource:instance_id()
+            , emqx_resource:resource_type()
+            , emqx_resource:resource_config()
+            , emqx_resource:create_opts()
+            ) ->
+          emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()).
+create(InstId, ResourceType, Config, Opts) ->
+    emqx_cluster_rpc:multicall(emqx_resource, create_local, [InstId, ResourceType, Config, Opts]).
+
+-spec create_dry_run( emqx_resource:resource_type()
+                    , emqx_resource:resource_config()
+                    ) ->
+          emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()).
+create_dry_run(ResourceType, Config) ->
+    emqx_cluster_rpc:multicall(emqx_resource, create_dry_run_local, [ResourceType, Config]).
+
+-spec recreate( emqx_resource:instance_id()
+              , emqx_resource:resource_type()
+              , emqx_resource:resource_config()
+              , emqx_resource:create_opts()
+              ) ->
+          emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()).
+recreate(InstId, ResourceType, Config, Opts) ->
+    emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [InstId, ResourceType, Config, Opts]).
+
+-spec remove(emqx_resource:instance_id()) ->
+          emqx_cluster_rpc:multicall_return(ok).
+remove(InstId) ->
+    emqx_cluster_rpc:multicall(emqx_resource, remove_local, [InstId]).

+ 29 - 11
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -18,7 +18,6 @@
 -compile(nowarn_export_all).
 -compile(export_all).
 
--include("emqx_authn.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
@@ -61,12 +60,12 @@ t_create_remove(_) ->
     {error, _} = emqx_resource:check_and_create_local(
                    ?ID,
                    ?TEST_RESOURCE,
-                   #{unknown => <<"test_resource">>}),
+                   #{unknown => test_resource}),
 
     {ok, _} = emqx_resource:create_local(
                 ?ID,
                 ?TEST_RESOURCE,
-                #{name => <<"test_resource">>}),
+                #{name => test_resource}),
 
     #{pid := Pid} = emqx_resource:query(?ID, get_state),
 
@@ -81,7 +80,7 @@ t_query(_) ->
     {ok, _} = emqx_resource:create_local(
                 ?ID,
                 ?TEST_RESOURCE,
-                #{name => <<"test_resource">>}),
+                #{name => test_resource}),
 
     Pid = self(),
     Success = fun() -> Pid ! success end,
@@ -112,13 +111,19 @@ t_healthy(_) ->
 
     ok = emqx_resource:health_check(?ID),
 
-    [#{status := started}] = emqx_resource:list_instances_verbose(),
+    ?assertMatch(
+        [#{status := started}],
+        emqx_resource:list_instances_verbose()),
 
     erlang:exit(Pid, shutdown),
 
-    {error, dead} = emqx_resource:health_check(?ID),
+    ?assertEqual(
+        {error, dead},
+        emqx_resource:health_check(?ID)),
 
-    [#{status := stopped}] = emqx_resource:list_instances_verbose(),
+    ?assertMatch(
+        [#{status := stopped}],
+        emqx_resource:list_instances_verbose()),
 
     ok = emqx_resource:remove_local(?ID).
 
@@ -126,12 +131,12 @@ t_stop_start(_) ->
     {error, _} = emqx_resource:check_and_create_local(
                    ?ID,
                    ?TEST_RESOURCE,
-                   #{unknown => <<"test_resource">>}),
+                   #{unknown => test_resource}),
 
     {ok, _} = emqx_resource:create_local(
                 ?ID,
                 ?TEST_RESOURCE,
-                #{name => <<"test_resource">>}),
+                #{name => test_resource}),
 
     #{pid := Pid0} = emqx_resource:query(?ID, get_state),
 
@@ -161,10 +166,23 @@ t_list_filter(_) ->
                 #{name => grouped_a}),
 
     [Id1] = emqx_resource:list_group_instances(<<"default">>),
-    {ok, #{config := #{name := a}}} = emqx_resource:get_instance(Id1),
+    ?assertMatch(
+        {ok, #{config := #{name := a}}},
+        emqx_resource:get_instance(Id1)),
 
     [Id2] = emqx_resource:list_group_instances(<<"group">>),
-    {ok, #{config := #{name := grouped_a}}} = emqx_resource:get_instance(Id2).
+    ?assertMatch(
+        {ok, #{config := #{name := grouped_a}}},
+        emqx_resource:get_instance(Id2)).
+
+t_create_dry_run_local(_) ->
+    ?assertEqual(
+       ok,
+       emqx_resource:create_dry_run_local(
+         ?TEST_RESOURCE,
+         #{name => test_resource, register => true})),
+
+    ?assertEqual(undefined, whereis(test_resource)).
 
 %%------------------------------------------------------------------------------
 %% Helpers

+ 16 - 5
apps/emqx_resource/test/emqx_test_resource.erl

@@ -31,16 +31,23 @@
 %% callbacks for emqx_resource config schema
 -export([roots/0]).
 
-roots() -> [{"name", fun name/1}].
+roots() -> [{name, fun name/1},
+            {register, fun register/1}].
 
-name(type) -> binary();
+name(type) -> atom();
 name(nullable) -> false;
 name(_) -> undefined.
 
-on_start(InstId, #{name := Name}) ->
+register(type) -> boolean();
+register(nullable) -> false;
+register(default) -> false;
+register(_) -> undefined.
+
+on_start(InstId, #{name := Name} = Opts) ->
+    Register = maps:get(register, Opts, false),
     {ok, #{name => Name,
            id => InstId,
-           pid => spawn_dummy_process()}}.
+           pid => spawn_dummy_process(Name, Register)}}.
 
 on_stop(_InstId, #{pid := Pid}) ->
     erlang:exit(Pid, shutdown),
@@ -59,9 +66,13 @@ on_health_check(_InstId, State = #{pid := Pid}) ->
 on_config_merge(OldConfig, NewConfig, _Params) ->
     maps:merge(OldConfig, NewConfig).
 
-spawn_dummy_process() ->
+spawn_dummy_process(Name, Register) ->
     spawn(
       fun() ->
+              true = case Register of
+                         true -> register(Name, self());
+                         _ -> true
+                     end,
               Ref = make_ref(),
               receive
                   Ref -> ok

+ 50 - 25
apps/emqx_statsd/src/emqx_statsd_api.erl

@@ -20,36 +20,61 @@
 
 -include("emqx_statsd.hrl").
 
--import(emqx_mgmt_util, [ schema/1
-                        , bad_request/0]).
+-include_lib("typerefl/include/types.hrl").
 
--export([api_spec/0]).
+-import(hoconsc, [mk/2, ref/2]).
 
--export([ statsd/2
+-export([statsd/2]).
+
+-export([ api_spec/0
+        , paths/0
+        , schema/1
         ]).
 
+-define(API_TAG_STATSD, [<<"statsd">>]).
+-define(SCHEMA_MODULE, emqx_statsd_schema).
+
+-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
+
+
 api_spec() ->
-    {statsd_api(), []}.
-
-conf_schema() ->
-    emqx_mgmt_api_configs:gen_schema(emqx:get_raw_config([statsd])).
-
-statsd_api() ->
-    Metadata = #{
-        get => #{
-            description => <<"Get statsd info">>,
-            responses => #{<<"200">> => schema(conf_schema())}
-        },
-        put => #{
-            description => <<"Update Statsd">>,
-            'requestBody' => schema(conf_schema()),
-            responses => #{
-                <<"200">> => schema(conf_schema()),
-                <<"400">> => bad_request()
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    ["/statsd"].
+
+schema("/statsd") ->
+    #{ 'operationId' => statsd
+     , get =>
+           #{ description => <<"Get statsd config">>
+            , tags => ?API_TAG_STATSD
+            , responses =>
+                  #{200 => statsd_config_schema()}
             }
-        }
-    },
-    [{"/statsd", Metadata, statsd}].
+     , put =>
+           #{ description => <<"Set statsd config">>
+            , tags => ?API_TAG_STATSD
+            , 'requestBody' => statsd_config_schema()
+            , responses =>
+                  #{200 => statsd_config_schema()}
+            }
+     }.
+
+%%--------------------------------------------------------------------
+%% Helper funcs
+%%--------------------------------------------------------------------
+
+statsd_config_schema() ->
+    emqx_dashboard_swagger:schema_with_example(
+      ref(?SCHEMA_MODULE, "statsd"),
+      statsd_example()).
+
+statsd_example() ->
+    #{ enable => true
+     , flush_time_interval => "32s"
+     , sample_time_interval => "32s"
+     , server => "127.0.0.1:8125"
+     }.
 
 statsd(get, _Params) ->
     {200, emqx:get_raw_config([<<"statsd">>], #{})};
@@ -60,5 +85,5 @@ statsd(put, #{body := Body}) ->
             {200, NewConfig};
         {error, Reason} ->
             Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])),
-            {500, 'INTERNAL_ERROR', Message}
+            {500, ?INTERNAL_ERROR, Message}
     end.

+ 17 - 1
apps/emqx_statsd/src/emqx_statsd_schema.erl

@@ -1,3 +1,19 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
 -module(emqx_statsd_schema).
 
 -include_lib("typerefl/include/types.hrl").
@@ -17,7 +33,7 @@ namespace() -> "statsd".
 roots() -> ["statsd"].
 
 fields("statsd") ->
-    [ {enable, hoconsc:mk(boolean(), #{default => false})}
+    [ {enable, hoconsc:mk(boolean(), #{default => false, nullable => false})}
     , {server, fun server/1}
     , {sample_time_interval, fun duration_ms/1}
     , {flush_time_interval,  fun duration_ms/1}

+ 1 - 1
mix.exs

@@ -55,7 +55,7 @@ defmodule EMQXUmbrella.MixProject do
       {:mria, github: "emqx/mria", tag: "0.1.5", override: true},
       {:ekka, github: "emqx/ekka", tag: "0.11.2", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.0", override: true},
-      {:minirest, github: "emqx/minirest", tag: "1.2.9", override: true},
+      {:minirest, github: "emqx/minirest", tag: "1.2.10", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.2"},
       {:replayq, "0.3.3", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},

+ 1 - 1
rebar.config

@@ -56,7 +56,7 @@
     , {mria, {git, "https://github.com/emqx/mria", {tag, "0.1.5"}}}
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.2"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.0"}}}
-    , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.9"}}}
+    , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.10"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
     , {replayq, "0.3.3"}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}