Просмотр исходного кода

Merge pull request #6629 from k32/bpapi

feat(bpapi): Add backplane API static checks
k32 4 лет назад
Родитель
Сommit
b6efa2aa9c

+ 6 - 0
apps/emqx/include/bpapi.hrl

@@ -0,0 +1,6 @@
+-ifndef(EMQX_BPAPI_HRL).
+-define(EMQX_BPAPI_HRL, true).
+
+-compile({parse_transform, emqx_bpapi_trans}).
+
+-endif.

+ 0 - 0
apps/emqx/priv/.gitkeep


+ 1 - 1
apps/emqx/rebar.config

@@ -16,7 +16,7 @@
     , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.2"}}}
-    , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
+    , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.0"}}}
     , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.22.2"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
     , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}

+ 39 - 0
apps/emqx/src/bpapi/emqx_bpapi.erl

@@ -0,0 +1,39 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_bpapi).
+
+-export_type([api/0, api_version/0, var_name/0, call/0, rpc/0, bpapi_meta/0]).
+
+-type api() :: atom().
+-type api_version() :: non_neg_integer().
+-type var_name() :: atom().
+-type call() :: {module(), atom(), [var_name()]}.
+-type rpc() :: {_From :: call(), _To :: call()}.
+
+-type bpapi_meta() ::
+        #{ api     := api()
+         , version := api_version()
+         , calls   := [rpc()]
+         , casts   := [rpc()]
+         }.
+
+-callback introduced_in() -> string().
+
+-callback deprecated_since() -> string().
+
+-callback bpapi_meta() -> bpapi_meta().
+
+-optional_callbacks([deprecated_since/0]).

+ 263 - 0
apps/emqx/src/bpapi/emqx_bpapi_static_checks.erl

@@ -0,0 +1,263 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bpapi_static_checks).
+
+-export([dump/1, dump/0, check_compat/1]).
+
+-include_lib("emqx/include/logger.hrl").
+
+-type api_dump() :: #{{emqx_bpapi:api(), emqx_bpapi:api_version()} =>
+                          #{ calls := [emqx_bpapi:rpc()]
+                           , casts := [emqx_bpapi:rpc()]
+                           }}.
+
+-type dialyzer_spec() :: {_Type, [_Type]}.
+
+-type dialyzer_dump() :: #{mfa() => dialyzer_spec()}.
+
+-type fulldump() :: #{ api        => api_dump()
+                     , signatures => dialyzer_dump()
+                     , release    => string()
+                     }.
+
+-type param_types() :: #{emqx_bpapi:var_name() => _Type}.
+
+%% Applications we wish to ignore in the analysis:
+-define(IGNORED_APPS, "gen_rpc, recon, observer_cli, snabbkaffe, ekka, mria").
+%% List of known RPC backend modules:
+-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
+%% List of functions in the RPC backend modules that we can ignore:
+-define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0").
+
+-define(XREF, myxref).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Functions related to BPAPI compatibility checking
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-spec check_compat([file:filename()]) -> boolean().
+check_compat(DumpFilenames) ->
+    put(bpapi_ok, true),
+    Dumps = lists:map(fun(FN) ->
+                              {ok, [Dump]} = file:consult(FN),
+                              Dump
+                      end,
+                      DumpFilenames),
+    [check_compat(I, J) || I <- Dumps, J <- Dumps],
+    erase(bpapi_ok).
+
+%% Note: sets nok flag
+-spec check_compat(fulldump(), fulldump()) -> ok.
+check_compat(Dump1, Dump2) ->
+    check_api_immutability(Dump1, Dump2),
+    typecheck_apis(Dump1, Dump2).
+
+%% It's not allowed to change BPAPI modules. Check that no changes
+%% have been made. (sets nok flag)
+-spec check_api_immutability(fulldump(), fulldump()) -> ok.
+check_api_immutability(#{release := Rel1, api := APIs1}, #{release := Rel2, api := APIs2})
+  when Rel2 >= Rel1 ->
+    %% TODO: Handle API deprecation
+    _ = maps:map(
+          fun(Key = {API, Version}, Val) ->
+                  case maps:get(Key, APIs2, undefined) of
+                      Val ->
+                          ok;
+                      undefined ->
+                          setnok(),
+                          ?ERROR("API ~p v~p was removed in release ~p without being deprecated.",
+                                 [API, Version, Rel2]);
+                      _Val ->
+                          setnok(),
+                          ?ERROR("API ~p v~p was changed between ~p and ~p. Backplane API should be immutable.",
+                                 [API, Version, Rel1, Rel2])
+                  end
+          end,
+          APIs1),
+    ok;
+check_api_immutability(_, _) ->
+    ok.
+
+%% Note: sets nok flag
+-spec typecheck_apis(fulldump(), fulldump()) -> ok.
+typecheck_apis( #{release := CallerRelease, api := CallerAPIs, signatures := CallerSigs}
+              , #{release := CalleeRelease, signatures := CalleeSigs}
+              ) ->
+    AllCalls = lists:flatten([[Calls, Casts]
+                              || #{calls := Calls, casts := Casts} <- maps:values(CallerAPIs)]),
+    lists:foreach(fun({From, To}) ->
+                          Caller = get_param_types(CallerSigs, From),
+                          Callee = get_param_types(CalleeSigs, To),
+                          %% TODO: check return types
+                          case typecheck_rpc(Caller, Callee) of
+                              [] ->
+                                  ok;
+                              TypeErrors ->
+                                  setnok(),
+                                  [?ERROR("Incompatible RPC call: "
+                                          "type of the parameter ~p of RPC call ~s on release ~p "
+                                          "is not a subtype of the target function ~s on release ~p",
+                                          [Var, format_call(From), CallerRelease,
+                                           format_call(To), CalleeRelease])
+                                   || Var <- TypeErrors]
+                          end
+                  end,
+                  AllCalls).
+
+-spec typecheck_rpc(param_types(), param_types()) -> [emqx_bpapi:var_name()].
+typecheck_rpc(Caller, Callee) ->
+    maps:fold(fun(Var, CalleeType, Acc) ->
+                      #{Var := CallerType} = Caller,
+                      case erl_types:t_is_subtype(CallerType, CalleeType) of
+                          true  -> Acc;
+                          false -> [Var|Acc]
+                      end
+              end,
+              [],
+              Callee).
+
+-spec get_param_types(dialyzer_dump(), emqx_bpapi:call()) -> param_types().
+get_param_types(Signatures, {M, F, A}) ->
+    Arity = length(A),
+    #{{M, F, Arity} := {_RetType, AttrTypes}} = Signatures,
+    Arity = length(AttrTypes), % assert
+    maps:from_list(lists:zip(A, AttrTypes)).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Functions related to BPAPI dumping
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+dump() ->
+    case {filelib:wildcard("*_plt"), filelib:wildcard("_build/emqx*/lib")} of
+        {[PLT|_], [RelDir|_]} ->
+            dump(#{plt => PLT, reldir => RelDir});
+        _ ->
+            error("failed to guess run options")
+    end.
+
+%% Collect the local BPAPI modules to a dump file
+-spec dump(map()) -> boolean().
+dump(Opts) ->
+    put(bpapi_ok, true),
+    PLT = prepare(Opts),
+    %% First we run XREF to find all callers of any known RPC backend:
+    Callers = find_remote_calls(Opts),
+    {BPAPICalls, NonBPAPICalls} = lists:partition(fun is_bpapi_call/1, Callers),
+    warn_nonbpapi_rpcs(NonBPAPICalls),
+    APIDump = collect_bpapis(BPAPICalls),
+    DialyzerDump = collect_signatures(PLT, APIDump),
+    Release = emqx_app:get_release(),
+    dump_api(#{api => APIDump, signatures => DialyzerDump, release => Release}),
+    erase(bpapi_ok).
+
+prepare(#{reldir := RelDir, plt := PLT}) ->
+    ?INFO("Starting xref...", []),
+    xref:start(?XREF),
+    filelib:wildcard(RelDir ++ "/*/ebin/") =:= [] andalso
+        error("No applications found in the release directory. Wrong directory?"),
+    xref:set_default(?XREF, [{warnings, false}]),
+    xref:add_release(?XREF, RelDir),
+    %% Now to the dialyzer stuff:
+    ?INFO("Loading PLT...", []),
+    dialyzer_plt:from_file(PLT).
+
+find_remote_calls(_Opts) ->
+    Query = "XC | (A - [" ?IGNORED_APPS "]:App)
+               || ([" ?RPC_MODULES "] : Mod - " ?IGNORED_RPC_CALLS ")",
+    {ok, Calls} = xref:q(?XREF, Query),
+    ?INFO("Calls to RPC modules ~p", [Calls]),
+    {Callers, _Callees} = lists:unzip(Calls),
+    Callers.
+
+-spec warn_nonbpapi_rpcs([mfa()]) -> ok.
+warn_nonbpapi_rpcs([]) ->
+    ok;
+warn_nonbpapi_rpcs(L) ->
+    setnok(),
+    lists:foreach(fun({M, F, A}) ->
+                          ?ERROR("~p:~p/~p does a remote call outside of a dedicated "
+                                 "backplane API module. "
+                                 "It may break during rolling cluster upgrade",
+                                 [M, F, A])
+                  end,
+                  L).
+
+-spec is_bpapi_call(mfa()) -> boolean().
+is_bpapi_call({Module, _Function, _Arity}) ->
+    case catch Module:bpapi_meta() of
+        #{api := _} -> true;
+        _           -> false
+    end.
+
+-spec dump_api(fulldump()) -> ok.
+dump_api(Term = #{api := _, signatures := _, release := Release}) ->
+    Filename = filename:join(code:priv_dir(emqx), Release ++ ".bpapi"),
+    file:write_file(Filename, io_lib:format("~0p.", [Term])).
+
+-spec collect_bpapis([mfa()]) -> api_dump().
+collect_bpapis(L) ->
+    Modules = lists:usort([M || {M, _F, _A} <- L]),
+    lists:foldl(fun(Mod, Acc) ->
+                        #{ api     := API
+                         , version := Vsn
+                         , calls   := Calls
+                         , casts   := Casts
+                         } = Mod:bpapi_meta(),
+                        Acc#{{API, Vsn} => #{ calls => Calls
+                                            , casts => Casts
+                                            }}
+                end,
+                #{},
+                Modules).
+
+-spec collect_signatures(_PLT, api_dump()) -> dialyzer_dump().
+collect_signatures(PLT, APIs) ->
+    maps:fold(fun(_APIAndVersion, #{calls := Calls, casts := Casts}, Acc0) ->
+                      Acc1 = lists:foldl(fun enrich/2, {Acc0, PLT}, Calls),
+                      {Acc, PLT} = lists:foldl(fun enrich/2, Acc1, Casts),
+                      Acc
+              end,
+              #{},
+              APIs).
+
+%% Add information about the call types from the PLT
+-spec enrich(emqx_bpapi:rpc(), {dialyzer_dump(), _PLT}) -> {dialyzer_dump(), _PLT}.
+enrich({From0, To0}, {Acc0, PLT}) ->
+    From = call_to_mfa(From0),
+    To   = call_to_mfa(To0),
+    case {dialyzer_plt:lookup(PLT, From), dialyzer_plt:lookup(PLT, To)} of
+        {{value, TFrom}, {value, TTo}} ->
+            Acc = Acc0#{ From => TFrom
+                       , To   => TTo
+                       },
+            {Acc, PLT};
+        {{value, _}, none} ->
+            setnok(),
+            ?CRITICAL("Backplane API function ~s calls a missing remote function ~s",
+                      [format_call(From0), format_call(To0)]),
+            error(missing_target)
+     end.
+
+-spec call_to_mfa(emqx_bpapi:call()) -> mfa().
+call_to_mfa({M, F, A}) ->
+    {M, F, length(A)}.
+
+format_call({M, F, A}) ->
+    io_lib:format("~p:~p/~p", [M, F, length(A)]).
+
+setnok() ->
+    put(bpapi_ok, false).

+ 195 - 0
apps/emqx/src/bpapi/emqx_bpapi_trans.erl

@@ -0,0 +1,195 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @hidden This parse transform generates BPAPI metadata function for
+%% a module, and helps dialyzer typechecking RPC calls
+-module(emqx_bpapi_trans).
+
+-export([parse_transform/2, format_error/1]).
+
+%%-define(debug, true).
+
+-define(META_FUN, bpapi_meta).
+
+-type semantics() :: call | cast.
+
+-record(s,
+        { api              :: emqx_bpapi:api()
+        , module           :: module()
+        , version          :: emqx_bpapi:api_version() | undefined
+        , targets = []     :: [{semantics(), emqx_bpapi:call(), emqx_bpapi:call()}]
+        , errors = []      :: list()
+        , file
+        }).
+
+format_error(invalid_name) ->
+    "BPAPI module name should follow <API>_proto_v<number> pattern";
+format_error({invalid_fun, Name, Arity}) ->
+    io_lib:format("malformed function ~p/~p. "
+                  "BPAPI functions should have exactly one clause "
+                  "and call (emqx_|e)rpc at the top level",
+                  [Name, Arity]).
+
+parse_transform(Forms, _Options) ->
+    log("Original:~n~p", [Forms]),
+    State = #s{file = File} = lists:foldl(fun go/2, #s{}, Forms),
+    log("parse_trans state: ~p", [State]),
+    case check(State) of
+        [] ->
+            finalize(Forms, State);
+        Errors ->
+            {error, [{File, [{Line, ?MODULE, Msg} || {Line, Msg} <- Errors]}], []}
+    end.
+
+%% Scan erl_forms:
+go({attribute, _, file, {File, _}}, S) ->
+    S#s{file = File};
+go({attribute, Line, module, Mod}, S) ->
+    case api_and_version(Mod) of
+        {ok, API, Vsn} -> S#s{api = API, version = Vsn, module = Mod};
+        error          -> push_err(Line, invalid_name, S)
+    end;
+go({function, _Line, introduced_in, 0, _}, S)  ->
+    S;
+go({function, _Line, deprecated_since, 0, _}, S)  ->
+    S;
+go({function, Line, Name, Arity, Clauses}, S) ->
+    analyze_fun(Line, Name, Arity, Clauses, S);
+go(_, S) ->
+    S.
+
+check(#s{errors = Err}) ->
+    %% Post-processing checks can be placed here
+    Err.
+
+finalize(Forms, S) ->
+    {Attrs, Funcs} = lists:splitwith(fun is_attribute/1, Forms),
+    AST = mk_meta_fun(S),
+    log("Meta fun:~n~p", [AST]),
+    Attrs ++ [mk_export()] ++ [AST|Funcs].
+
+mk_meta_fun(#s{api = API, version = Vsn, targets = Targets}) ->
+    Line = 0,
+    Calls = [{From, To} || {call, From, To} <- Targets],
+    Casts = [{From, To} || {cast, From, To} <- Targets],
+    Ret = typerefl_quote:const(Line, #{ api => API
+                                      , version => Vsn
+                                      , calls => Calls
+                                      , casts => Casts
+                                      }),
+    {function, Line, ?META_FUN, _Arity = 0,
+     [{clause, Line, _Args = [], _Guards = [],
+       [Ret]}]}.
+
+mk_export() ->
+    {attribute, 0, export, [{?META_FUN, 0}]}.
+
+is_attribute({attribute, _Line, _Attr, _Val}) -> true;
+is_attribute(_)                               -> false.
+
+%% Extract the target function of the RPC call
+analyze_fun(Line, Name, Arity, [{clause, Line, Head, _Guards, Exprs}], S) ->
+    analyze_exprs(Line, Name, Arity, Head, Exprs, S);
+analyze_fun(Line, Name, Arity, _Clauses, S) ->
+    invalid_fun(Line, Name, Arity, S).
+
+analyze_exprs(Line, Name, Arity, Head, Exprs, S) ->
+    log("~p/~p (~p):~n~p", [Name, Arity, Head, Exprs]),
+    try
+        [{call, _, CallToBackend, CallArgs}] = Exprs,
+        OuterArgs = extract_outer_args(Head),
+        Key = {S#s.module, Name, OuterArgs},
+        {Semantics, Target} = extract_target_call(CallToBackend, CallArgs),
+        push_target({Semantics, Key, Target}, S)
+    catch
+        _:Err:Stack ->
+            log("Failed to process function call:~n~s~nStack: ~p", [Err, Stack]),
+            invalid_fun(Line, Name, Arity, S)
+    end.
+
+-spec extract_outer_args([erl_parse:abstract_form()]) -> [atom()].
+extract_outer_args(Abs) ->
+    lists:map(fun({var, _, Var}) ->
+                      Var;
+                 ({match, _, {var, _, Var}, _}) ->
+                      Var;
+                 ({match, _, _, {var, _, Var}}) ->
+                      Var
+              end,
+              Abs).
+
+-spec extract_target_call(_AST, [_AST]) -> {semantics(), emqx_bpapi:call()}.
+extract_target_call(RPCBackend, OuterArgs) ->
+    {Semantics, {atom, _, M}, {atom, _, F}, A} = extract_mfa(RPCBackend, OuterArgs),
+    {Semantics, {M, F, list_to_args(A)}}.
+
+-define(BACKEND(MOD, FUN), {remote, _, {atom, _, MOD}, {atom, _, FUN}}).
+-define(IS_RPC(MOD), (MOD =:= erpc orelse MOD =:= rpc)).
+
+%% gen_rpc:
+extract_mfa(?BACKEND(gen_rpc, _), _) ->
+    %% gen_rpc has an extremely messy API, thankfully it's fully wrapped
+    %% by emqx_rpc, so we simply forbid direct calls to it:
+    error("direct call to gen_rpc");
+%% emqx_rpc:
+extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Node, M, F, A]) ->
+    {call_or_cast(CallOrCast), M, F, A};
+extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Tag, _Node, M, F, A]) ->
+    {call_or_cast(CallOrCast), M, F, A};
+%% (e)rpc:
+extract_mfa(?BACKEND(RPC, CallOrCast), [_Node, M, F, A]) when ?IS_RPC(RPC) ->
+    {call_or_cast(CallOrCast), M, F, A};
+extract_mfa(?BACKEND(RPC, CallOrCast), [_Node, M, F, A, _Timeout]) when ?IS_RPC(RPC) ->
+    {call_or_cast(CallOrCast), M, F, A};
+extract_mfa(_, _) ->
+    error("unrecognized RPC call").
+
+call_or_cast(cast)      -> cast;
+call_or_cast(multicast) -> cast;
+call_or_cast(multicall) -> call;
+call_or_cast(call)      -> call.
+
+list_to_args({cons, _, {var, _, A}, T}) ->
+    [A|list_to_args(T)];
+list_to_args({nil, _}) ->
+    [].
+
+invalid_fun(Line, Name, Arity, S) ->
+    push_err(Line, {invalid_fun, Name, Arity}, S).
+
+push_err(Line, Err, S = #s{errors = Errs}) ->
+    S#s{errors = [{Line, Err}|Errs]}.
+
+push_target(Target, S = #s{targets = Targets}) ->
+    S#s{targets = [Target|Targets]}.
+
+-spec api_and_version(module()) -> {ok, emqx_bpapi:api(), emqx_bpapi:version()} | error.
+api_and_version(Module) ->
+    Opts = [{capture, all_but_first, list}],
+    case re:run(atom_to_list(Module), "(.*)_proto_v([0-9]+)$", Opts) of
+        {match, [API, VsnStr]} ->
+            {ok, list_to_atom(API), list_to_integer(VsnStr)};
+        nomatch ->
+            error
+    end.
+
+-ifdef(debug).
+log(Fmt, Args) ->
+    io:format(user, "!! " ++ Fmt ++ "~n", Args).
+-else.
+log(_, _) ->
+    ok.
+-endif.

+ 1 - 1
apps/emqx/src/emqx_broker.erl

@@ -300,7 +300,7 @@ forward(Node, To, Delivery, sync) ->
     end.
 
 -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
-dispatch(Topic, Delivery) ->
+dispatch(Topic, Delivery = #delivery{}) when is_binary(Topic) ->
     case emqx:is_running() of
         true  ->
             do_dispatch(Topic, Delivery);

+ 9 - 9
apps/emqx/src/emqx_rpc.erl

@@ -14,9 +14,11 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% @doc wrap gen_rpc?
 -module(emqx_rpc).
 
+%% Note: please don't forget to add new API functions to
+%% `emqx_bpapi_trans:extract_mfa'
+
 -export([ call/4
         , call/5
         , cast/4
@@ -30,27 +32,25 @@
           , rpc_nodes/1
           ]}).
 
--define(RPC, gen_rpc).
-
 -define(DefaultClientNum, 1).
 
 call(Node, Mod, Fun, Args) ->
-    filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)).
+    filter_result(gen_rpc:call(rpc_node(Node), Mod, Fun, Args)).
 
 call(Key, Node, Mod, Fun, Args) ->
-    filter_result(?RPC:call(rpc_node({Key, Node}), Mod, Fun, Args)).
+    filter_result(gen_rpc:call(rpc_node({Key, Node}), Mod, Fun, Args)).
 
 multicall(Nodes, Mod, Fun, Args) ->
-    filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)).
+    filter_result(gen_rpc:multicall(rpc_nodes(Nodes), Mod, Fun, Args)).
 
 multicall(Key, Nodes, Mod, Fun, Args) ->
-    filter_result(?RPC:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)).
+    filter_result(gen_rpc:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)).
 
 cast(Node, Mod, Fun, Args) ->
-    filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)).
+    filter_result(gen_rpc:cast(rpc_node(Node), Mod, Fun, Args)).
 
 cast(Key, Node, Mod, Fun, Args) ->
-    filter_result(?RPC:cast(rpc_node({Key, Node}), Mod, Fun, Args)).
+    filter_result(gen_rpc:cast(rpc_node({Key, Node}), Mod, Fun, Args)).
 
 rpc_node(Node) when is_atom(Node) ->
     {Node, rand:uniform(max_client_num())};

+ 38 - 0
apps/emqx/src/proto/emqx_broker_proto_v1.erl

@@ -0,0 +1,38 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_broker_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+        , forward/3
+        , forward_async/3
+        ]).
+
+-include("bpapi.hrl").
+-include("emqx.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result().
+forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
+    emqx_rpc:call(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
+
+-spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> ok.
+forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
+    emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).

+ 1 - 1
mix.exs

@@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do
       {:esockd, github: "emqx/esockd", tag: "5.9.0", override: true},
       {:mria, github: "emqx/mria", tag: "0.1.5", override: true},
       {:ekka, github: "emqx/ekka", tag: "0.11.2", override: true},
-      {:gen_rpc, github: "emqx/gen_rpc", tag: "2.5.1", override: true},
+      {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.0", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.2.9", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.2"},
       {:replayq, "0.3.3", override: true},

+ 1 - 1
rebar.config

@@ -55,7 +55,7 @@
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
     , {mria, {git, "https://github.com/emqx/mria", {tag, "0.1.5"}}}
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.2"}}}
-    , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
+    , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.0"}}}
     , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.9"}}}
     , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
     , {replayq, "0.3.3"}