Forráskód Böngészése

Merge pull request #6725 from k32/bpapi-bridge

refactor(emqx_bridge): Decorate remote procedure calls
k32 4 éve
szülő
commit
82cfbfaaff

+ 3 - 3
apps/emqx/src/bpapi/README.md

@@ -96,9 +96,9 @@ The following limitations apply to these modules:
 1. Once the minor EMQX release stated in `introduced_in()` callback of
 1. Once the minor EMQX release stated in `introduced_in()` callback of
    a module reaches GA, the module is frozen. No changes are allowed
    a module reaches GA, the module is frozen. No changes are allowed
    there, except for adding `deprecated_since()` callback.
    there, except for adding `deprecated_since()` callback.
-2. After the _next_ minor release after the one deprecating the
-   module reaches GA, the module can be removed.
-3. Old versions of the protocol can be dropped in the next major
+2. If the backplane API was deprecated in a release `maj.min.0`, then
+   it can be removed in release `maj.min+1.0`.
+3. Old versions of the protocols can be dropped in the next major
    release.
    release.
 
 
 This way we ensure each minor EMQX release is backward-compatible with
 This way we ensure each minor EMQX release is backward-compatible with

+ 6 - 3
apps/emqx/src/emqx_rpc.erl

@@ -30,9 +30,10 @@
 -export_type([ badrpc/0
 -export_type([ badrpc/0
              , call_result/0
              , call_result/0
              , cast_result/0
              , cast_result/0
+             , multicall_result/1
              , multicall_result/0
              , multicall_result/0
              , erpc/1
              , erpc/1
-             , erpc_multicast/1
+             , erpc_multicall/1
              ]).
              ]).
 
 
 -compile({inline,
 -compile({inline,
@@ -48,7 +49,9 @@
 
 
 -type cast_result() :: true.
 -type cast_result() :: true.
 
 
--type multicall_result() :: {_Results :: [term()], _BadNodes :: [node()]}.
+-type multicall_result(Result) :: {[Result], _BadNodes :: [node()]}.
+
+-type multicall_result() :: multicall_result(term()).
 
 
 -type erpc(Ret) :: {ok, Ret}
 -type erpc(Ret) :: {ok, Ret}
                  | {throw, _Err}
                  | {throw, _Err}
@@ -56,7 +59,7 @@
                  | {error, {exception, _Reason, _Stack :: list()}}
                  | {error, {exception, _Reason, _Stack :: list()}}
                  | {error, {erpc, _Reason}}.
                  | {error, {erpc, _Reason}}.
 
 
--type erpc_multicast(Ret) :: [erpc(Ret)].
+-type erpc_multicall(Ret) :: [erpc(Ret)].
 
 
 -spec call(node(), module(), atom(), list()) -> call_result().
 -spec call(node(), module(), atom(), list()) -> call_result().
 call(Node, Mod, Fun, Args) ->
 call(Node, Mod, Fun, Args) ->

+ 1 - 1
apps/emqx/src/emqx_trace/emqx_trace.erl

@@ -204,7 +204,7 @@ get_trace_filename(Name) ->
         end end,
         end end,
     transaction(Tran).
     transaction(Tran).
 
 
--spec trace_file(File :: list()) ->
+-spec trace_file(File :: file:filename_all()) ->
     {ok, Node :: list(), Binary :: binary()} |
     {ok, Node :: list(), Binary :: binary()} |
     {error, Node :: list(), Reason :: term()}.
     {error, Node :: list(), Reason :: term()}.
 trace_file(File) ->
 trace_file(File) ->

+ 7 - 23
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -29,8 +29,7 @@
 -export(['/bridges'/2, '/bridges/:id'/2,
 -export(['/bridges'/2, '/bridges/:id'/2,
          '/bridges/:id/operation/:operation'/2]).
          '/bridges/:id/operation/:operation'/2]).
 
 
--export([ list_local_bridges/1
-        , lookup_from_local_node/2
+-export([ lookup_from_local_node/2
         ]).
         ]).
 
 
 -define(TYPES, [mqtt, http]).
 -define(TYPES, [mqtt, http]).
@@ -288,12 +287,8 @@ schema("/bridges/:id/operation/:operation") ->
             end
             end
     end;
     end;
 '/bridges'(get, _Params) ->
 '/bridges'(get, _Params) ->
-    {200, zip_bridges([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
-
-list_local_bridges(Node) when Node =:= node() ->
-    [format_resp(Data) || Data <- emqx_bridge:list()];
-list_local_bridges(Node) ->
-    rpc_call(Node, list_local_bridges, [Node]).
+    {200, zip_bridges([[format_resp(Data) || Data <- emqx_bridge_proto_v1:list_bridges(Node)]
+                       || Node <- mria_mnesia:running_nodes()])}.
 
 
 '/bridges/:id'(get, #{bindings := #{id := Id}}) ->
 '/bridges/:id'(get, #{bindings := #{id := Id}}) ->
     ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
     ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
@@ -321,7 +316,8 @@ list_local_bridges(Node) ->
         end).
         end).
 
 
 lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
 lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
-    case rpc_multicall(lookup_from_local_node, [BridgeType, BridgeName]) of
+    Nodes = mria_mnesia:running_nodes(),
+    case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
         {ok, [{ok, _} | _] = Results} ->
         {ok, [{ok, _} | _] = Results} ->
             {SuccCode, format_bridge_info([R || {ok, R} <- Results])};
             {SuccCode, format_bridge_info([R || {ok, R} <- Results])};
         {ok, [{error, not_found} | _]} ->
         {ok, [{error, not_found} | _]} ->
@@ -433,9 +429,8 @@ format_metrics(#{
         } }) ->
         } }) ->
     ?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax).
     ?METRICS(Match, Succ, Failed + Ex, Rate, Rate5m, RateMax).
 
 
-rpc_multicall(Func, Args) ->
-    Nodes = mria_mnesia:running_nodes(),
-    ResL = erpc:multicall(Nodes, ?MODULE, Func, Args, 15000),
+
+is_ok(ResL) ->
     case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of
     case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of
         [] -> {ok, [Res || {ok, Res} <- ResL]};
         [] -> {ok, [Res || {ok, Res} <- ResL]};
         ErrL -> {error, ErrL}
         ErrL -> {error, ErrL}
@@ -446,17 +441,6 @@ filter_out_request_body(Conf) ->
         <<"node_metrics">>, <<"metrics">>, <<"node">>],
         <<"node_metrics">>, <<"metrics">>, <<"node">>],
     maps:without(ExtraConfs, Conf).
     maps:without(ExtraConfs, Conf).
 
 
-rpc_call(Node, Fun, Args) ->
-    rpc_call(Node, ?MODULE, Fun, Args).
-
-rpc_call(Node, Mod, Fun, Args) when Node =:= node() ->
-    apply(Mod, Fun, Args);
-rpc_call(Node, Mod, Fun, Args) ->
-    case rpc:call(Node, Mod, Fun, Args) of
-        {badrpc, Reason} -> {error, Reason};
-        Res -> Res
-    end.
-
 error_msg(Code, Msg) when is_binary(Msg) ->
 error_msg(Code, Msg) when is_binary(Msg) ->
     #{code => Code, message => Msg};
     #{code => Code, message => Msg};
 error_msg(Code, Msg) ->
 error_msg(Code, Msg) ->

+ 43 - 0
apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl

@@ -0,0 +1,43 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+
+        , list_bridges/1
+        , lookup_from_all_nodes/3
+        ]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+-define(TIMEOUT, 15000).
+
+introduced_in() ->
+    "5.0.0".
+
+-spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
+list_bridges(Node) ->
+    rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).
+
+-type key() :: atom() | binary() | [byte()].
+
+-spec lookup_from_all_nodes([node()], key(), key()) ->
+          emqx_rpc:erpc_multicall().
+lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
+    erpc:multicall(Nodes, emqx_bridge_api, lookup_from_local_node, [BridgeType, BridgeName], ?TIMEOUT).

+ 2 - 1
apps/emqx_conf/src/proto/emqx_conf_proto_v1.erl

@@ -69,7 +69,8 @@ update(KeyPath, UpdateReq, Opts) ->
 update(Node, KeyPath, UpdateReq, Opts) ->
 update(Node, KeyPath, UpdateReq, Opts) ->
     rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000).
     rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000).
 
 
--spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> _.
+-spec remove_config(update_config_key_path(), emqx_config:update_opts()) ->
+          emqx_cluster_rpc:multicall_result().
 remove_config(KeyPath, Opts) ->
 remove_config(KeyPath, Opts) ->
     emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]).
     emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]).
 
 

+ 1 - 0
apps/emqx_gateway/test/emqx_lwm2m_SUITE.erl

@@ -1928,6 +1928,7 @@ case100_clients_api(Config) ->
     %% kickout
     %% kickout
     {204, _} =
     {204, _} =
         request(delete, "/gateway/lwm2m/clients/" ++ binary_to_list(ClientId)),
         request(delete, "/gateway/lwm2m/clients/" ++ binary_to_list(ClientId)),
+    timer:sleep(100),
     {200, #{data := []}} = request(get, "/gateway/lwm2m/clients").
     {200, #{data := []}} = request(get, "/gateway/lwm2m/clients").
 
 
 case100_subscription_api(Config) ->
 case100_subscription_api(Config) ->

+ 14 - 8
apps/emqx_management/src/emqx_mgmt_api_trace.erl

@@ -38,7 +38,7 @@
 -export([validate_name/1]).
 -export([validate_name/1]).
 
 
 %% for rpc
 %% for rpc
--export([read_trace_file/3
+-export([ read_trace_file/3
         , get_trace_size/0
         , get_trace_size/0
         ]).
         ]).
 
 
@@ -241,7 +241,7 @@ trace(get, _Params) ->
             List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end,
             List = lists:sort(fun(#{start_at := A}, #{start_at := B}) -> A > B end,
                 emqx_trace:format(List0)),
                 emqx_trace:format(List0)),
             Nodes = mria_mnesia:running_nodes(),
             Nodes = mria_mnesia:running_nodes(),
-            TraceSize = cluster_call(?MODULE, get_trace_size, [], 30000),
+            TraceSize = wrap_rpc(emqx_mgmt_trace_proto_v1:get_trace_size(Nodes)),
             AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize),
             AllFileSize = lists:foldl(fun(F, Acc) -> maps:merge(Acc, F) end, #{}, TraceSize),
             Now = erlang:system_time(second),
             Now = erlang:system_time(second),
             Traces =
             Traces =
@@ -333,13 +333,12 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) ->
                 end, [], TraceFiles).
                 end, [], TraceFiles).
 
 
 collect_trace_file(TraceLog) ->
 collect_trace_file(TraceLog) ->
-    cluster_call(emqx_trace, trace_file, [TraceLog], 60000).
-
-cluster_call(Mod, Fun, Args, Timeout) ->
     Nodes = mria_mnesia:running_nodes(),
     Nodes = mria_mnesia:running_nodes(),
-    {GoodRes, BadNodes} = rpc:multicall(Nodes, Mod, Fun, Args, Timeout),
+    wrap_rpc(emqx_mgmt_trace_proto_v1:trace_file(Nodes, TraceLog)).
+
+wrap_rpc({GoodRes, BadNodes}) ->
     BadNodes =/= [] andalso
     BadNodes =/= [] andalso
-        ?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes, mfa => {Mod, Fun, Args}}),
+        ?SLOG(error, #{msg => "rpc_call_failed", bad_nodes => BadNodes}),
     GoodRes.
     GoodRes.
 
 
 stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
 stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
@@ -348,7 +347,7 @@ stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
     Bytes = maps:get(<<"bytes">>, Query, 1000),
     Bytes = maps:get(<<"bytes">>, Query, 1000),
     case to_node(Node0) of
     case to_node(Node0) of
         {ok, Node} ->
         {ok, Node} ->
-            case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
+            case emqx_mgmt_trace_proto_v1:read_trace_file(Node, Name, Position, Bytes) of
                 {ok, Bin} ->
                 {ok, Bin} ->
                     Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
                     Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
                     {200, #{meta => Meta, items => Bin}};
                     {200, #{meta => Meta, items => Bin}};
@@ -368,6 +367,7 @@ stream_log_file(get, #{bindings := #{name := Name}, query_string := Query}) ->
         {error, not_found} -> {400, #{code => 'NODE_ERROR', message => <<"Node not found">>}}
         {error, not_found} -> {400, #{code => 'NODE_ERROR', message => <<"Node not found">>}}
     end.
     end.
 
 
+-spec get_trace_size() -> #{{node(), file:name_all()} => non_neg_integer()}.
 get_trace_size() ->
 get_trace_size() ->
     TraceDir = emqx_trace:trace_dir(),
     TraceDir = emqx_trace:trace_dir(),
     Node = node(),
     Node = node(),
@@ -381,6 +381,12 @@ get_trace_size() ->
     end.
     end.
 
 
 %% this is an rpc call for stream_log_file/2
 %% this is an rpc call for stream_log_file/2
+-spec read_trace_file( binary()
+                     , non_neg_integer()
+                     , non_neg_integer()
+                     ) -> {ok, binary()}
+                        | {error, _}
+                        | {eof, non_neg_integer()}.
 read_trace_file(Name, Position, Limit) ->
 read_trace_file(Name, Position, Limit) ->
     case emqx_trace:get_trace_filename(Name) of
     case emqx_trace:get_trace_filename(Name) of
         {error, _} = Error -> Error;
         {error, _} = Error -> Error;

+ 51 - 0
apps/emqx_management/src/proto/emqx_mgmt_trace_proto_v1.erl

@@ -0,0 +1,51 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_mgmt_trace_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+
+        , trace_file/2
+        , get_trace_size/1
+        , read_trace_file/4
+        ]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec get_trace_size([node()]) ->
+          emqx_rpc:multicall_result(#{{node(), file:name_all()} => non_neg_integer()}).
+get_trace_size(Nodes) ->
+    rpc:multicall(Nodes, emqx_mgmt_api_trace, get_trace_size, [], 30000).
+
+-spec trace_file([node()], file:name_all()) ->
+          emqx_rpc:multicall_result(
+            {ok, Node :: list(), Binary :: binary()} |
+            {error, Node :: list(), Reason :: term()}).
+trace_file(Nodes, File) ->
+    rpc:multicall(Nodes, emqx_trace, trace_file, [File], 60000).
+
+-spec read_trace_file(node(), binary(), non_neg_integer(), non_neg_integer()) ->
+                {ok, binary()}
+              | {error, _}
+              | {eof, non_neg_integer()}
+              | {badrpc, _}.
+read_trace_file(Node, Name, Position, Limit) ->
+    rpc:call(Node, emqx_mgmt_api_trace, read_trace_file, [Name, Position, Limit]).

+ 1 - 1
apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl

@@ -55,7 +55,7 @@
 -define(SAMPLING, 1).
 -define(SAMPLING, 1).
 -endif.
 -endif.
 
 
--export_type([metrics/0]).
+-export_type([metrics/0, handler_name/0, metric_id/0]).
 
 
 -type rate() :: #{
 -type rate() :: #{
     current => float(),
     current => float(),

+ 36 - 0
apps/emqx_plugin_libs/src/proto/emqx_plugin_libs_proto_v1.erl

@@ -0,0 +1,36 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_plugin_libs_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+
+        , get_metrics/3
+        ]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec get_metrics( node()
+                 , emqx_plugin_libs_metrics:handler_name()
+                 , emqx_plugin_libs_metrics:metric_id()
+                 ) -> emqx_plugin_libs_metrics:metrics() | {badrpc, _}.
+get_metrics(Node, HandlerName, MetricId) ->
+    rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [HandlerName, MetricId]).

+ 1 - 2
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -322,7 +322,7 @@ get_rule_metrics(Id) ->
          , node => Node
          , node => Node
          }
          }
     end,
     end,
-    [Format(Node, rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [rule_metrics, Id]))
+    [Format(Node, emqx_plugin_libs_proto_v1:get_metrics(Node, rule_metrics, Id))
      || Node <- mria_mnesia:running_nodes()].
      || Node <- mria_mnesia:running_nodes()].
 
 
 aggregate_metrics(AllMetrics) ->
 aggregate_metrics(AllMetrics) ->
@@ -350,4 +350,3 @@ filter_out_request_body(Conf) ->
     ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>,
     ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>,
         <<"metrics">>, <<"node">>],
         <<"metrics">>, <<"node">>],
     maps:without(ExtraConfs, Conf).
     maps:without(ExtraConfs, Conf).
-