Ver código fonte

feat(bpapi): Initial commit

k32 4 anos atrás
pai
commit
5c2a559991

+ 1 - 1
apps/emqx/src/emqx_broker.erl

@@ -300,7 +300,7 @@ forward(Node, To, Delivery, sync) ->
     end.
     end.
 
 
 -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
 -spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
-dispatch(Topic, Delivery) ->
+dispatch(Topic, Delivery = #delivery{}) when is_binary(Topic) ->
     case emqx:is_running() of
     case emqx:is_running() of
         true  ->
         true  ->
             do_dispatch(Topic, Delivery);
             do_dispatch(Topic, Delivery);

+ 3 - 1
apps/emqx/src/emqx_rpc.erl

@@ -14,9 +14,11 @@
 %% limitations under the License.
 %% limitations under the License.
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-%% @doc wrap gen_rpc?
 -module(emqx_rpc).
 -module(emqx_rpc).
 
 
+%% Note: please don't forget to add new API functions to
+%% `emqx_bpapi_trans:extract_mfa'
+
 -export([ call/4
 -export([ call/4
         , call/5
         , call/5
         , cast/4
         , cast/4

+ 34 - 0
apps/emqx/src/proto/emqx_broker_proto_v1.erl

@@ -0,0 +1,34 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_broker_proto_v1).
+
+-introduced_in("5.0.0").
+
+-export([ forward/3
+        , forward_async/3
+        ]).
+
+-include_lib("emqx_bpapi/include/bpapi.hrl").
+-include("emqx.hrl").
+
+-spec forward(node(), emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result().
+forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
+    emqx_rpc:call(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
+
+-spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> ok.
+forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
+    emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).

+ 5 - 0
apps/emqx_bpapi/README.md

@@ -0,0 +1,5 @@
+emqx_bpapi
+=====
+
+A library that helps maintaining EMQX's backplane API backward and
+forward compatibility.

+ 6 - 0
apps/emqx_bpapi/include/bpapi.hrl

@@ -0,0 +1,6 @@
+-ifndef(EMQX_BPAPI_HRL).
+-define(EMQX_BPAPI_HRL, true).
+
+-compile({parse_transform, emqx_bpapi_trans}).
+
+-endif.

+ 0 - 0
apps/emqx_bpapi/priv/.gitkeep


+ 15 - 0
apps/emqx_bpapi/src/emqx_bpapi.app.src

@@ -0,0 +1,15 @@
+{application, emqx_bpapi,
+ [{description, "A library for verifying safety of RPC calls"},
+  {vsn, "0.1.0"},
+  {registered, []},
+  {applications,
+   [kernel,
+    stdlib,
+    typerefl %% Just for some metaprogramming utils
+   ]},
+  {env,[]},
+  {modules, []},
+
+  {licenses, ["Apache 2.0"]},
+  {links, []}
+ ]}.

+ 54 - 0
apps/emqx_bpapi/src/emqx_bpapi.erl

@@ -0,0 +1,54 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_bpapi).
+
+-export([parse_semver/1, api_and_version/1]).
+
+-export_type([var_name/0, call/0, rpc/0, bpapi_meta/0]).
+
+-type semver() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}.
+
+-type api() :: atom().
+-type api_version() :: non_neg_integer().
+-type var_name() :: atom().
+-type call() :: {module(), atom(), [var_name()]}.
+-type rpc() :: {_From :: call(), _To :: call()}.
+
+-type bpapi_meta() ::
+        #{ api     := api()
+         , version := api_version()
+         , calls   := [rpc()]
+         , casts   := [rpc()]
+         }.
+
+-spec parse_semver(string()) -> {ok, semver()}
+                              | false.
+parse_semver(Str) ->
+    Opts = [{capture, all_but_first, list}],
+    case re:run(Str, "^([0-9]+)\\.([0-9]+)\\.([0-9]+)$", Opts) of
+        {match, [A, B, C]} -> {ok, {list_to_integer(A), list_to_integer(B), list_to_integer(C)}};
+        nomatch            -> error
+    end.
+
+-spec api_and_version(module()) -> {atom(), non_neg_integer()}.
+api_and_version(Module) ->
+  Opts = [{capture, all_but_first, list}],
+  case re:run(atom_to_list(Module), "(.*)_proto_v([0-9]+)$", Opts) of
+    {match, [API, VsnStr]} ->
+      {ok, list_to_atom(API), list_to_integer(VsnStr)};
+    nomatch ->
+      error(Module)
+  end.

+ 147 - 0
apps/emqx_bpapi/src/emqx_bpapi_static_checks.erl

@@ -0,0 +1,147 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bpapi_static_checks).
+
+-export([run/1, run/0]).
+
+-include_lib("emqx/include/logger.hrl").
+
+%% `emqx_bpapi:call' enriched with dialyzer spec
+-type typed_call() :: {emqx_bpapi:call(), _DialyzerSpec}.
+-type typed_rpc() :: {typed_call(), typed_call()}.
+
+-type fulldump() :: #{emqx_bpapi:api() =>
+                          #{emqx_bpapi:api_version() =>
+                                #{ calls := [typed_rpc()]
+                                 , casts := [typed_rpc()]
+                                 }
+                           }}.
+
+%% Applications we wish to ignore in the analysis:
+-define(IGNORED_APPS, "gen_rpc, recon, observer_cli, snabbkaffe, ekka, mria").
+%% List of known RPC backend modules:
+-define(RPC_MODULES, "gen_rpc, erpc, rpc, emqx_rpc").
+%% List of functions in the RPC backend modules that we can ignore:
+-define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0").
+
+-define(XREF, myxref).
+
+run() ->
+    case {filelib:wildcard("*_plt"), filelib:wildcard("_build/emqx*/lib")} of
+        {[PLT|_], [RelDir|_]} ->
+            run(#{plt => PLT, reldir => RelDir});
+        _ ->
+            error("failed to guess run options")
+    end.
+
+-spec run(map()) -> boolean().
+run(Opts) ->
+    put(bpapi_ok, true),
+    PLT = prepare(Opts),
+    %% First we run XREF to find all callers of any known RPC backend:
+    Callers = find_remote_calls(Opts),
+    {BPAPICalls, NonBPAPICalls} = lists:partition(fun is_bpapi_call/1, Callers),
+    warn_nonbpapi_rpcs(NonBPAPICalls),
+    CombinedAPI = collect_bpapis(BPAPICalls, PLT),
+    dump_api(CombinedAPI),
+    erase(bpapi_ok).
+
+prepare(#{reldir := RelDir, plt := PLT}) ->
+    ?INFO("Starting xref...", []),
+    xref:start(?XREF),
+    filelib:wildcard(RelDir ++ "/*/ebin/") =:= [] andalso
+        error("No applications found in the release directory. Wrong directory?"),
+    xref:set_default(?XREF, [{warnings, false}]),
+    xref:add_release(?XREF, RelDir),
+    %% Now to the dialyzer stuff:
+    ?INFO("Loading PLT...", []),
+    dialyzer_plt:from_file(PLT).
+
+find_remote_calls(_Opts) ->
+    Query = "XC | (A - [" ?IGNORED_APPS "]:App)
+               || ([" ?RPC_MODULES "] : Mod - " ?IGNORED_RPC_CALLS ")",
+    {ok, Calls} = xref:q(?XREF, Query),
+    ?INFO("Calls to RPC modules ~p", [Calls]),
+    {Callers, _Callees} = lists:unzip(Calls),
+    Callers.
+
+-spec warn_nonbpapi_rpcs([mfa()]) -> ok.
+warn_nonbpapi_rpcs([]) ->
+    ok;
+warn_nonbpapi_rpcs(L) ->
+    setnok(),
+    lists:foreach(fun({M, F, A}) ->
+                          ?ERROR("~p:~p/~p does a remote call outside of a dedicated "
+                                 "backplane API module. "
+                                 "It may break during rolling cluster upgrade",
+                                 [M, F, A])
+                  end,
+                  L).
+
+-spec is_bpapi_call(mfa()) -> boolean().
+is_bpapi_call({Module, _Function, _Arity}) ->
+    case catch Module:bpapi_meta() of
+        #{api := _} -> true;
+        _           -> false
+    end.
+
+-spec dump_api(fulldump()) -> ok.
+dump_api(Term) ->
+    Filename = filename:join(code:priv_dir(emqx_bpapi), emqx_app:get_release() ++ ".bpapi"),
+    file:write_file(Filename, io_lib:format("~0p.", [Term])).
+
+-spec collect_bpapis([mfa()], _PLT) -> fulldump().
+collect_bpapis(L, PLT) ->
+    Modules = lists:usort([M || {M, _F, _A} <- L]),
+    lists:foldl(fun(Mod, Acc) ->
+                        #{ api     := API
+                         , version := Vsn
+                         , calls   := Calls0
+                         , casts   := Casts0
+                         } = Mod:bpapi_meta(),
+                        Calls = enrich(PLT, Calls0),
+                        Casts = enrich(PLT, Casts0),
+                        Acc#{API => #{Vsn => #{ calls => Calls
+                                              , casts => Casts
+                                              }}}
+                end,
+                #{},
+                Modules
+               ).
+
+%% Add information about types from the PLT
+-spec enrich(_PLT, [emqx_bpapi:rpc()]) -> [typed_rpc()].
+enrich(PLT, Calls) ->
+    [case {lookup_type(PLT, From), lookup_type(PLT, To)} of
+         {{value, TFrom}, {value, TTo}} ->
+             {{From, TFrom}, {To, TTo}};
+         {_, none} ->
+             setnok(),
+             ?CRITICAL("Backplane API function ~s calls a missing remote function ~s",
+                       [format_call(From), format_call(To)]),
+             error(missing_target)
+     end
+     || {From, To} <- Calls].
+
+lookup_type(PLT, {M, F, A}) ->
+    dialyzer_plt:lookup(PLT, {M, F, length(A)}).
+
+format_call({M, F, A}) ->
+    io_lib:format("~p:~p/~p", [M, F, length(A)]).
+
+setnok() ->
+    put(bpapi_ok, false).

+ 200 - 0
apps/emqx_bpapi/src/emqx_bpapi_trans.erl

@@ -0,0 +1,200 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+%% @hidden This parse transform generates BPAPI metadata function for
+%% a module, and helps dialyzer typechecking RPC calls
+-module(emqx_bpapi_trans).
+
+-export([parse_transform/2, format_error/1]).
+
+%%-define(debug, true).
+
+-define(META_FUN, bpapi_meta).
+
+-type semantics() :: call | cast.
+
+-record(s,
+        { api              :: atom()
+        , module           :: atom()
+        , version          :: non_neg_integer() | undefined
+        , introduced_in    :: emqx_bpapi:semver() | undefined
+        , deprecated_since :: emqx_bpapi:semver() | undefined
+        , targets = []     :: [{semantics(), emqx_bpapi:call(), emqx_bpapi:call()}]
+        , errors = []      :: [string()]
+        , file
+        }).
+
+format_error(invalid_name) ->
+  "BPAPI module name should follow <API>_proto_v<number> pattern";
+format_error(invalid_introduced_in) ->
+  "-introduced_in attribute should be present and its value should be a semver string";
+format_error(invalid_deprecated_since) ->
+  "value of -deprecated_since attribute should be a semver string";
+format_error({invalid_fun, Name, Arity}) ->
+  io_lib:format("malformed function ~p/~p. "
+                "BPAPI functions should have exactly one clause "
+                "and call (emqx_|e)rpc at the top level",
+                [Name, Arity]).
+
+parse_transform(Forms, _Options) ->
+  log("Original:~n~p", [Forms]),
+  State = #s{file = File} = lists:foldl(fun go/2, #s{}, Forms),
+  log("parse_trans state: ~p", [State]),
+  case check(State) of
+    [] ->
+      finalize(Forms, State);
+    Errors ->
+      {error, [{File, [{Line, ?MODULE, Msg} || {Line, Msg} <- Errors]}], []}
+  end.
+
+%% Scan erl_forms:
+go({attribute, _, file, {File, _}}, S) ->
+  S#s{file = File};
+go({attribute, Line, module, Mod}, S) ->
+  case emqx_bpapi:api_and_version(Mod) of
+    {ok, API, Vsn} -> S#s{api = API, version = Vsn, module = Mod};
+    error          -> push_err(Line, invalid_name, S)
+  end;
+go({attribute, _Line, introduced_in, Str}, S) ->
+  case is_list(Str) andalso emqx_bpapi:parse_semver(Str) of
+    {ok, Vsn} -> S#s{introduced_in = Vsn};
+    false     -> S %% Don't report error here, it's done in check/1
+  end;
+go({attribute, Line, deprecated_since, Str}, S) ->
+  case is_list(Str) andalso emqx_bpapi:parse_semver(Str) of
+    {ok, Vsn} -> S#s{deprecated_since = Vsn};
+    false     -> push_err(Line, invalid_deprecated_since, S)
+  end;
+go({function, Line, Name, Arity, Clauses}, S) ->
+  analyze_fun(Line, Name, Arity, Clauses, S);
+go(_, S) ->
+  S.
+
+check(#s{errors = Err0, introduced_in = II}) ->
+  [{none, invalid_introduced_in} || II =:= undefined] ++
+    Err0.
+
+finalize(Forms, S) ->
+  {Attrs, Funcs} = lists:splitwith(fun is_attribute/1, Forms),
+  AST = mk_meta_fun(S),
+  log("Meta fun:~n~p", [AST]),
+  Attrs ++ [mk_export()] ++ [AST|Funcs].
+
+mk_meta_fun(#s{api = API, version = Vsn, targets = Targets}) ->
+  Line = 0,
+  Calls = [{From, To} || {call, From, To} <- Targets],
+  Casts = [{From, To} || {cast, From, To} <- Targets],
+  Ret = typerefl_quote:const(Line, #{ api => API
+                                    , version => Vsn
+                                    , calls => Calls
+                                    , casts => Casts
+                                    }),
+  {function, Line, ?META_FUN, _Arity = 0,
+   [{clause, Line, _Args = [], _Guards = [],
+     [Ret]}]}.
+
+mk_export() ->
+  {attribute, 0, export, [{?META_FUN, 0}]}.
+
+is_attribute({attribute, _Line, _Attr, _Val}) -> true;
+is_attribute(_)                               -> false.
+
+%% Extract the target function of the RPC call
+analyze_fun(Line, Name, Arity, [{clause, Line, Head, _Guards, Exprs}], S) ->
+  analyze_exprs(Line, Name, Arity, Head, Exprs, S);
+analyze_fun(Line, Name, Arity, _Clauses, S) ->
+  invalid_fun(Line, Name, Arity, S).
+
+analyze_exprs(Line, Name, Arity, Head, Exprs, S) ->
+  log("~p/~p (~p):~n~p", [Name, Arity, Head, Exprs]),
+  try
+    [{call, _, CallToBackend, CallArgs}] = Exprs,
+    OuterArgs = extract_outer_args(Head),
+    Key = {S#s.module, Name, OuterArgs},
+    {Semantics, Target} = extract_target_call(CallToBackend, CallArgs),
+    push_target({Semantics, Key, Target}, S)
+  catch
+    _:Err:Stack ->
+      log("Failed to process function call:~n~s~nStack: ~p", [Err, Stack]),
+      invalid_fun(Line, Name, Arity, S)
+  end.
+
+-spec extract_outer_args(erl_parse:abstract_form()) -> [atom()].
+extract_outer_args(Abs) ->
+  lists:map(fun({var, _, Var}) ->
+                Var;
+               ({match, _, {var, _, Var}, _}) ->
+                Var;
+               ({match, _, _, {var, _, Var}}) ->
+                Var
+            end,
+            Abs).
+
+-spec extract_target_call(Abs, [Abs]) -> {semantics(), emqx_bpapi:call()}
+          when Abs :: erl_parse:abstract_form().
+extract_target_call(RPCBackend, OuterArgs) ->
+  {Semantics, {atom, _, M}, {atom, _, F}, A} = extract_mfa(RPCBackend, OuterArgs),
+  {Semantics, {M, F, list_to_args(A)}}.
+
+-define(BACKEND(MOD, FUN), {remote, _, {atom, _, MOD}, {atom, _, FUN}}).
+-define(IS_RPC(MOD), (MOD =:= erpc orelse MOD =:= rpc)).
+
+-spec extract_mfa(Abs, #s{}) -> {call | cast, Abs, Abs, Abs}
+          when Abs :: erl_parse:abstract_form().
+%% gen_rpc:
+extract_mfa(?BACKEND(gen_rpc, _), _) ->
+  %% gen_rpc has an extremely messy API, thankfully it's fully wrapped
+  %% by emqx_rpc, so we simply forbid direct calls to it:
+  error("direct call to gen_rpc");
+%% emqx_rpc:
+extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Node, M, F, A]) ->
+  {call_or_cast(CallOrCast), M, F, A};
+extract_mfa(?BACKEND(emqx_rpc, CallOrCast), [_Tag, _Node, M, F, A]) ->
+  {call_or_cast(CallOrCast), M, F, A};
+%% (e)rpc:
+extract_mfa(?BACKEND(RPC, CallOrCast), [_Node, M, F, A]) when ?IS_RPC(RPC) ->
+  {call_or_cast(CallOrCast), M, F, A};
+extract_mfa(?BACKEND(RPC, CallOrCast), [_Node, M, F, A, _Timeout]) when ?IS_RPC(RPC) ->
+  {call_or_cast(CallOrCast), M, F, A};
+extract_mfa(_, _) ->
+  error("unrecognized RPC call").
+
+call_or_cast(cast)      -> cast;
+call_or_cast(multicast) -> cast;
+call_or_cast(multicall) -> call;
+call_or_cast(call)      -> call.
+
+list_to_args({cons, _, {var, _, A}, T}) ->
+  [A|list_to_args(T)];
+list_to_args({nil, _}) ->
+  [].
+
+invalid_fun(Line, Name, Arity, S) ->
+  push_err(Line, {invalid_fun, Name, Arity}, S).
+
+push_err(Line, Err, S = #s{errors = Errs}) ->
+  S#s{errors = [{Line, Err}|Errs]}.
+
+push_target(Target, S = #s{targets = Targets}) ->
+  S#s{targets = [Target|Targets]}.
+
+-ifdef(debug).
+log(Fmt, Args) ->
+  io:format(user, "!! " ++ Fmt ++ "~n", Args).
+-else.
+log(_, _) ->
+  ok.
+-endif.