Explorar o código

Merge pull request #6678 from k32/broker-bpapi

refactor(emqx_broker): Decorate remote procedure calls
k32 %!s(int64=4) %!d(string=hai) anos
pai
achega
9602ce0250

+ 5 - 3
apps/emqx/src/bpapi/emqx_bpapi_static_checks.erl

@@ -36,8 +36,9 @@
 
 -type param_types() :: #{emqx_bpapi:var_name() => _Type}.
 
-%% Applications we wish to ignore in the analysis:
--define(IGNORED_APPS, "gen_rpc, recon, observer_cli, snabbkaffe, ekka, mria").
+%% Applications and modules we wish to ignore in the analysis:
+-define(IGNORED_APPS, "gen_rpc, recon, redbug, observer_cli, snabbkaffe, ekka, mria").
+-define(IGNORED_MODULES, "emqx_rpc").
 %% List of known RPC backend modules:
 -define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
 %% List of functions in the RPC backend modules that we can ignore:
@@ -162,6 +163,7 @@ dump(Opts) ->
     DialyzerDump = collect_signatures(PLT, APIDump),
     Release = emqx_app:get_release(),
     dump_api(#{api => APIDump, signatures => DialyzerDump, release => Release}),
+    xref:stop(?XREF),
     erase(bpapi_ok).
 
 prepare(#{reldir := RelDir, plt := PLT}) ->
@@ -176,7 +178,7 @@ prepare(#{reldir := RelDir, plt := PLT}) ->
     dialyzer_plt:from_file(PLT).
 
 find_remote_calls(_Opts) ->
-    Query = "XC | (A - [" ?IGNORED_APPS "]:App)
+    Query = "XC | (A - [" ?IGNORED_APPS "]:App - [" ?IGNORED_MODULES "] : Mod)
                || ([" ?RPC_MODULES "] : Mod - " ?IGNORED_RPC_CALLS ")",
     {ok, Calls} = xref:q(?XREF, Query),
     ?INFO("Calls to RPC modules ~p", [Calls]),

+ 7 - 15
apps/emqx/src/emqx_broker.erl

@@ -275,28 +275,20 @@ aggre(Routes) ->
 -spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync | async)
     -> emqx_types:deliver_result()).
 forward(Node, To, Delivery, async) ->
-    case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
-        true -> emqx_metrics:inc('messages.forward');
-        {badrpc, Reason} ->
-            ?SLOG(error, #{
-                msg => "async_forward_msg_to_node_failed",
-                node => Node,
-                reason => Reason
-            }, #{topic => To}),
-            {error, badrpc}
-    end;
-
+    true = emqx_broker_proto_v1:forward_async(Node, To, Delivery),
+    emqx_metrics:inc('messages.forward');
 forward(Node, To, Delivery, sync) ->
-    case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of
-        {badrpc, Reason} ->
+    case emqx_broker_proto_v1:forward(Node, To, Delivery) of
+        {Err, Reason} when Err =:= badrpc; Err =:= badtcp ->
             ?SLOG(error, #{
                 msg => "sync_forward_msg_to_node_failed",
                 node => Node,
-                reason => Reason
+                Err => Reason
             }, #{topic => To}),
             {error, badrpc};
         Result ->
-            emqx_metrics:inc('messages.forward'), Result
+            emqx_metrics:inc('messages.forward'),
+            Result
     end.
 
 -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).

+ 24 - 4
apps/emqx/src/emqx_rpc.erl

@@ -27,6 +27,12 @@
         , multicall/5
         ]).
 
+-export_type([ badrpc/0
+             , call_result/0
+             , cast_result/0
+             , multicall_result/0
+             ]).
+
 -compile({inline,
           [ rpc_node/1
           , rpc_nodes/1
@@ -34,23 +40,37 @@
 
 -define(DefaultClientNum, 1).
 
+-type badrpc() ::  {badrpc, term()} | {badtcp, term()}.
+
+-type call_result() :: term() | badrpc().
+
+-type cast_result() :: true.
+
+-type multicall_result() :: {_Results :: [term()], _BadNodes :: [node()]}.
+
+-spec call(node(), module(), atom(), list()) -> call_result().
 call(Node, Mod, Fun, Args) ->
     filter_result(gen_rpc:call(rpc_node(Node), Mod, Fun, Args)).
 
+-spec call(term(), node(), module(), atom(), list()) -> call_result().
 call(Key, Node, Mod, Fun, Args) ->
     filter_result(gen_rpc:call(rpc_node({Key, Node}), Mod, Fun, Args)).
 
+-spec multicall([node()], module(), atom(), list()) -> multicall_result().
 multicall(Nodes, Mod, Fun, Args) ->
-    filter_result(gen_rpc:multicall(rpc_nodes(Nodes), Mod, Fun, Args)).
+    gen_rpc:multicall(rpc_nodes(Nodes), Mod, Fun, Args).
 
+-spec multicall(term(), [node()], module(), atom(), list()) -> multicall_result().
 multicall(Key, Nodes, Mod, Fun, Args) ->
-    filter_result(gen_rpc:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)).
+    gen_rpc:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args).
 
+-spec cast(node(), module(), atom(), list()) -> cast_result().
 cast(Node, Mod, Fun, Args) ->
-    filter_result(gen_rpc:cast(rpc_node(Node), Mod, Fun, Args)).
+    gen_rpc:cast(rpc_node(Node), Mod, Fun, Args).
 
+-spec cast(term(), node(), module(), atom(), list()) -> cast_result().
 cast(Key, Node, Mod, Fun, Args) ->
-    filter_result(gen_rpc:cast(rpc_node({Key, Node}), Mod, Fun, Args)).
+    gen_rpc:cast(rpc_node({Key, Node}), Mod, Fun, Args).
 
 rpc_node(Node) when is_atom(Node) ->
     {Node, rand:uniform(max_client_num())};

+ 3 - 2
apps/emqx/src/proto/emqx_broker_proto_v1.erl

@@ -29,10 +29,11 @@
 introduced_in() ->
     "5.0.0".
 
--spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result().
+-spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()
+                                                                  | emqx_rpc:badrpc().
 forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
     emqx_rpc:call(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
 
--spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> ok.
+-spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> true.
 forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
     emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).

+ 0 - 2
apps/emqx/test/props/prop_emqx_rpc.erl

@@ -60,7 +60,6 @@ prop_nodes() ->
          begin
              Nodes = punch(Nodes0),
              case emqx_rpc:multicall(Nodes, erlang, system_time, []) of
-                 {badrpc, _Reason} -> true;
                  {RealResults, RealBadNodes}
                    when is_list(RealResults);
                         is_list(RealBadNodes) ->
@@ -74,7 +73,6 @@ prop_nodes_with_key() ->
          begin
              Nodes = punch(Nodes0),
              case emqx_rpc:multicall(Key, Nodes, erlang, system_time, []) of
-                 {badrpc, _Reason} -> true;
                  {RealResults, RealBadNodes}
                    when is_list(RealResults);
                         is_list(RealBadNodes) ->