Jelajahi Sumber

Merge pull request #6831 from k32/bpapi-negitiation

feat(bpapi): Version negotiation
k32 4 tahun lalu
induk
melakukan
c28017eeab

+ 1 - 1
Makefile

@@ -57,7 +57,7 @@ ct: $(REBAR) conf-segs
 
 .PHONY: static_checks
 static_checks:
-	@$(REBAR) as check do xref, dialyzer, ct --suite apps/emqx/test/emqx_bpapi_suite --readable false
+	@$(REBAR) as check do xref, dialyzer, ct --suite apps/emqx/test/emqx_static_checks --readable false
 
 APPS=$(shell $(CURDIR)/scripts/find-apps.sh)
 

+ 17 - 0
apps/emqx/priv/bpapi.versions

@@ -0,0 +1,17 @@
+{emqx,1}.
+{emqx_bridge,1}.
+{emqx_broker,1}.
+{emqx_cm,1}.
+{emqx_conf,1}.
+{emqx_dashboard,1}.
+{emqx_exhook,1}.
+{emqx_gateway_cm,1}.
+{emqx_management,1}.
+{emqx_mgmt_trace,1}.
+{emqx_persistent_session,1}.
+{emqx_plugin_libs,1}.
+{emqx_prometheus,1}.
+{emqx_resource,1}.
+{emqx_statsd,1}.
+{emqx_telemetry,1}.
+{emqx_topic_metrics,1}.

+ 39 - 4
apps/emqx/src/bpapi/README.md

@@ -91,11 +91,11 @@ is_running(Node) ->
     rpc:call(Node, emqx, is_running, []).
 ```
 
-The following limitations apply to these modules:
+## Backplane module life cycle
 
 1. Once the minor EMQX release stated in `introduced_in()` callback of
-   a module reaches GA, the module is frozen. No changes are allowed
-   there, except for adding `deprecated_since()` callback.
+   a module reaches GA, the module is frozen. Only very specific
+   changes are allowed in these modules, see next chapter.
 2. If the backplane API was deprecated in a release `maj.min.0`, then
    it can be removed in release `maj.min+1.0`.
 3. Old versions of the protocols can be dropped in the next major
@@ -104,6 +104,41 @@ The following limitations apply to these modules:
 This way we ensure each minor EMQX release is backward-compatible with
 the previous one.
 
+## Changes to BPAPI modules after GA
+
+Once the backplane API module is frozen, only certain types of changes
+can be made there.
+
+- Adding or removing functions is _forbidden_
+- Changing the RPC target function is _forbidden_
+- Renaming the function parameters should be safe in theory, but
+  currently the static check will complain when it happens
+- Renaming the types of the function parameters and the return type is
+  _allowed_
+- Changing the structure of the function parameters' types is
+  _forbidden_
+
+To clarify the last statement: BPAPI static checks only verify the
+structure of the type, so the following definitions are considered
+equivalent, and replacing one with another is perfectly fine:
+
+```erlang
+-type foo() :: inet:ip6_address().
+
+-type foo() :: {0..65535, 0..65535, 0..65535, 0..65535, 0..65535, 0..65535, 0..65535, 0..65535}.
+```
+
 # Protocol version negotiation
 
-TODO
+`emqx_bpapi` module provides APIs that business applications can use
+to negotiate protocol version:
+
+`emqx_bpapi:supported_version(Node, ProtocolId)` returns maximum
+protocol version supported by the remote node
+`Node`. `emqx_bpapi:supported_version(ProtocolId)` returns maximum
+protocol version that is supported by all nodes in the cluster. It can
+be useful when the protocol involves multicalls or multicasts.
+
+The business logic can assume that the supported protocol version is
+not going to change on the remote node, while it is running. So it is
+free to cache it for the duration of the session.

+ 71 - 0
apps/emqx/src/bpapi/emqx_bpapi.erl

@@ -15,8 +15,15 @@
 %%--------------------------------------------------------------------
 -module(emqx_bpapi).
 
+%% API:
+-export([start/0, announce/1, supported_version/1, supported_version/2,
+         versions_file/1]).
+
 -export_type([api/0, api_version/0, var_name/0, call/0, rpc/0, bpapi_meta/0]).
 
+-include("emqx.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+
 -type api() :: atom().
 -type api_version() :: non_neg_integer().
 -type var_name() :: atom().
@@ -30,6 +37,8 @@
          , casts   := [rpc()]
          }.
 
+-include("emqx_bpapi.hrl").
+
 -callback introduced_in() -> string().
 
 -callback deprecated_since() -> string().
@@ -37,3 +46,65 @@
 -callback bpapi_meta() -> bpapi_meta().
 
 -optional_callbacks([deprecated_since/0]).
+
+-spec start() -> ok.
+start() ->
+    ok = mria:create_table(?TAB, [ {type, set}
+                                 , {storage, ram_copies}
+                                 , {attributes, record_info(fields, ?TAB)}
+                                 , {rlog_shard, ?COMMON_SHARD}
+                                 ]),
+    ok = mria:wait_for_tables([?TAB]),
+    announce(emqx).
+
+%% @doc Get maximum version of the backplane API supported by the node
+-spec supported_version(node(), api()) -> api_version().
+supported_version(Node, API) ->
+    ets:lookup_element(?TAB, {Node, API}, #?TAB.version).
+
+%% @doc Get maximum version of the backplane API supported by the
+%% entire cluster
+-spec supported_version(api()) -> api_version().
+supported_version(API) ->
+    ets:lookup_element(?TAB, {?multicall, API}, #?TAB.version).
+
+-spec announce(atom()) -> ok.
+announce(App) ->
+    {ok, Data} = file:consult(?MODULE:versions_file(App)),
+    {atomic, ok} = mria:transaction(?COMMON_SHARD, fun announce_fun/1, [Data]),
+    ok.
+
+-spec versions_file(atom()) -> file:filename_all().
+versions_file(App) ->
+    filename:join(code:priv_dir(App), "bpapi.versions").
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+-spec announce_fun([{api(), api_version()}]) -> ok.
+announce_fun(Data) ->
+    %% Delete old records, if present:
+    MS = ets:fun2ms(fun(#?TAB{key = {node(), API}}) ->
+                            {node(), API}
+                    end),
+    OldKeys = mnesia:select(?TAB, MS, write),
+    _ = [mnesia:delete({?TAB, Key})
+         || Key <- OldKeys],
+    %% Insert new records:
+    _ = [mnesia:write(#?TAB{key = {node(), API}, version = Version})
+         || {API, Version} <- Data],
+    %% Update maximum supported version:
+    [update_minimum(API) || {API, _} <- Data],
+    ok.
+
+-spec update_minimum(api()) -> ok.
+update_minimum(API) ->
+    MS = ets:fun2ms(fun(#?TAB{ key     = {N, A}
+                             , version = Value
+                             }) when N =/= ?multicall,
+                                     A =:= API ->
+                            Value
+                    end),
+    MinVersion = lists:min(mnesia:select(?TAB, MS)),
+    mnesia:write(#?TAB{key = {?multicall, API}, version = MinVersion}).

+ 28 - 0
apps/emqx/src/bpapi/emqx_bpapi.hrl

@@ -0,0 +1,28 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-ifndef(EMQX_BPAPI_HRL).
+-define(EMQX_BPAPI_HRL, true).
+
+-define(TAB, bpapi).
+
+-define(multicall, multicall).
+
+-record(?TAB,
+        { key     :: {node() | ?multicall, emqx_bpapi:api()}
+        , version :: emqx_bpapi:api_version()
+        }).
+
+-endif.

+ 1 - 0
apps/emqx/src/emqx_app.erl

@@ -43,6 +43,7 @@ start(_Type, _Args) ->
     ok = maybe_load_config(),
     ok = emqx_persistent_session:init_db_backend(),
     ok = maybe_start_quicer(),
+    ok = emqx_bpapi:start(),
     wait_boot_shards(),
     {ok, Sup} = emqx_sup:start_link(),
     ok = maybe_start_listeners(),

+ 64 - 0
apps/emqx/test/emqx_bpapi_SUITE.erl

@@ -0,0 +1,64 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bpapi_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("stdlib/include/assert.hrl").
+-include_lib("emqx/src/bpapi/emqx_bpapi.hrl").
+
+all() -> emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_common_test_helpers:start_apps([emqx]),
+    [mnesia:dirty_write(Rec) || Rec <- fake_records()],
+    Config.
+
+end_per_suite(_Config) ->
+    meck:unload(),
+    [mnesia:dirty_delete({?TAB, Key}) || #?TAB{key = Key} <- fake_records()],
+    emqx_bpapi:announce(emqx),
+    ok.
+
+t_max_supported_version(_Config) ->
+    ?assertMatch(3, emqx_bpapi:supported_version('fake-node2@localhost', api2)),
+    ?assertMatch(2, emqx_bpapi:supported_version(api2)),
+    ?assertError(_, emqx_bpapi:supported_version('fake-node2@localhost', nonexistent_api)),
+    ?assertError(_, emqx_bpapi:supported_version(nonexistent_api)).
+
+t_announce(Config) ->
+    meck:new(emqx_bpapi, [passthrough, no_history]),
+    Filename = filename:join(?config(data_dir, Config), "test.versions"),
+    meck:expect(emqx_bpapi, versions_file, fun(_) -> Filename end),
+    ?assertMatch(ok, emqx_bpapi:announce(emqx)),
+    timer:sleep(100),
+    ?assertMatch(4, emqx_bpapi:supported_version(node(), api2)),
+    ?assertMatch(2, emqx_bpapi:supported_version(node(), api1)),
+    ?assertMatch(2, emqx_bpapi:supported_version(api2)),
+    ?assertMatch(2, emqx_bpapi:supported_version(api1)).
+
+fake_records() ->
+    [ #?TAB{key = {'fake-node@localhost', api1}, version = 2}
+    , #?TAB{key = {'fake-node2@localhost', api1}, version = 2}
+    , #?TAB{key = {?multicall, api1}, version = 2}
+
+    , #?TAB{key = {'fake-node@localhost', api2}, version = 2}
+    , #?TAB{key = {'fake-node2@localhost', api2}, version = 3}
+    , #?TAB{key = {?multicall, api2}, version = 2}
+    ].

+ 2 - 0
apps/emqx/test/emqx_bpapi_SUITE_data/test.versions

@@ -0,0 +1,2 @@
+{api1, 2}.
+{api2, 4}.

+ 18 - 2
apps/emqx/test/emqx_bpapi_static_checks.erl

@@ -16,7 +16,7 @@
 
 -module(emqx_bpapi_static_checks).
 
--export([run/0, dump/1, dump/0, check_compat/1]).
+-export([run/0, dump/1, dump/0, check_compat/1, versions_file/0]).
 
 -include_lib("emqx/include/logger.hrl").
 
@@ -206,6 +206,7 @@ dump(Opts) ->
     DialyzerDump = collect_signatures(PLT, APIDump),
     [Release|_] = string:split(emqx_app:get_release(), "-"),
     dump_api(#{api => APIDump, signatures => DialyzerDump, release => Release}),
+    dump_versions(APIDump),
     xref:stop(?XREF),
     erase(bpapi_ok).
 
@@ -254,6 +255,18 @@ dump_api(Term = #{api := _, signatures := _, release := Release}) ->
     ok = filelib:ensure_dir(Filename),
     file:write_file(Filename, io_lib:format("~0p.", [Term])).
 
+-spec dump_versions(api_dump()) -> ok.
+dump_versions(APIs) ->
+    Filename = versions_file(),
+    ?NOTICE("Dumping API versions to ~p", [Filename]),
+    ok = filelib:ensure_dir(Filename),
+    {ok, FD} = file:open(Filename, [write]),
+    lists:foreach(fun(API) ->
+                          ok = io:format(FD, "~p.~n", [API])
+                  end,
+                  lists:sort(maps:keys(APIs))),
+    file:close(FD).
+
 -spec collect_bpapis([mfa()]) -> api_dump().
 collect_bpapis(L) ->
     Modules = lists:usort([M || {M, _F, _A} <- L]),
@@ -311,7 +324,10 @@ setnok() ->
     put(bpapi_ok, false).
 
 dumps_dir() ->
-    filename:join(project_root_dir(), "apps/emqx/test/emqx_bpapi_suite_data").
+    filename:join(project_root_dir(), "apps/emqx/test/emqx_static_checks_data").
 
 project_root_dir() ->
     string:trim(os:cmd("git rev-parse --show-toplevel")).
+
+versions_file() ->
+    filename:join(project_root_dir(), "apps/emqx/priv/bpapi.versions").

+ 1 - 1
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -164,6 +164,7 @@ set_special_confs(_) ->
     ok.
 
 end_per_suite(_Config) ->
+    emqx_common_test_helpers:ensure_mnesia_stopped(),
     ok.
 
 end_per_group(gc_tests, Config) ->
@@ -1130,4 +1131,3 @@ split([H], L1, L2) ->
     {[H|L1], L2};
 split([H1, H2|Left], L1, L2) ->
     split(Left, [H1|L1], [H2|L2]).
-

+ 17 - 2
apps/emqx/test/emqx_bpapi_suite.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
--module(emqx_bpapi_suite).
+-module(emqx_static_checks).
 
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -33,4 +33,19 @@ end_per_suite(_Config) ->
             "https://github.com/emqx/emqx/blob/master/apps/emqx/src/bpapi/README.md", []).
 
 t_run_check(_) ->
-    ?assertMatch(true, emqx_bpapi_static_checks:run()).
+    try
+        {ok, OldData} = file:consult(emqx_bpapi_static_checks:versions_file()),
+        ?assert(emqx_bpapi_static_checks:run()),
+        {ok, NewData} = file:consult(emqx_bpapi_static_checks:versions_file()),
+        OldData =:= NewData orelse
+            begin
+                ?CRITICAL("BPAPI versions were changed, but not committed to the repo.\n"
+                          "Run 'make && make static_checks' and then add the changed "
+                          "'bpapi.versions' files to the commit.", []),
+                error(version_mismatch)
+            end
+    catch
+        EC:Err:Stack ->
+            ?CRITICAL("Test suite failed: ~p:~p~nStack:~p", [EC, Err, Stack]),
+            error(tc_failed)
+    end.