Browse Source

Merge pull request #6796 from k32/bpapi-cm

refactor(emqx_cm): Decorate RPCs
k32 4 years ago
parent
commit
455606fc0e

+ 37 - 28
apps/emqx/src/emqx_cm.erl

@@ -80,9 +80,15 @@
         , mark_channel_connected/1
         , mark_channel_disconnected/1
         , get_connected_client_count/0
+
+        , do_kick_session/3
+        , do_get_chan_stats/2
+        , do_get_chan_info/2
+        , do_get_chann_conn_mod/2
         ]).
 
 -export_type([ channel_info/0
+             , chan_pid/0
              ]).
 
 -type(chan_pid() :: pid()).
@@ -92,6 +98,8 @@
                         , _Stats :: emqx_types:stats()
                         }).
 
+-include("emqx_cm.hrl").
+
 %% Tables for channel management.
 -define(CHAN_TAB, emqx_channel).
 -define(CHAN_CONN_TAB, emqx_channel_conn).
@@ -111,10 +119,6 @@
 %% Server name
 -define(CM, ?MODULE).
 
--define(T_KICK, 5_000).
--define(T_GET_INFO, 5_000).
--define(T_TAKEOVER, 15_000).
-
 %% linting overrides
 -elvis([ {elvis_style, invalid_dynamic_call, #{ignore => [emqx_cm]}}
        , {elvis_style, god_modules, #{ignore => [emqx_cm]}}
@@ -181,16 +185,19 @@ connection_closed(ClientId, ChanPid) ->
 get_chan_info(ClientId) ->
     with_channel(ClientId, fun(ChanPid) -> get_chan_info(ClientId, ChanPid) end).
 
--spec(get_chan_info(emqx_types:clientid(), chan_pid())
+-spec(do_get_chan_info(emqx_types:clientid(), chan_pid())
       -> maybe(emqx_types:infos())).
-get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() ->
+do_get_chan_info(ClientId, ChanPid) ->
     Chan = {ClientId, ChanPid},
     try ets:lookup_element(?CHAN_INFO_TAB, Chan, 2)
     catch
         error:badarg -> undefined
-    end;
+    end.
+
+-spec(get_chan_info(emqx_types:clientid(), chan_pid())
+      -> maybe(emqx_types:infos())).
 get_chan_info(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO).
+    wrap_rpc(emqx_cm_proto_v1:get_chan_info(ClientId, ChanPid)).
 
 %% @doc Update infos of the channel.
 -spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()).
@@ -206,16 +213,19 @@ set_chan_info(ClientId, Info) when is_binary(ClientId) ->
 get_chan_stats(ClientId) ->
     with_channel(ClientId, fun(ChanPid) -> get_chan_stats(ClientId, ChanPid) end).
 
--spec(get_chan_stats(emqx_types:clientid(), chan_pid())
+-spec(do_get_chan_stats(emqx_types:clientid(), chan_pid())
       -> maybe(emqx_types:stats())).
-get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
+do_get_chan_stats(ClientId, ChanPid) ->
     Chan = {ClientId, ChanPid},
     try ets:lookup_element(?CHAN_INFO_TAB, Chan, 3)
     catch
         error:badarg -> undefined
-    end;
+    end.
+
+-spec(get_chan_stats(emqx_types:clientid(), chan_pid())
+      -> maybe(emqx_types:stats())).
 get_chan_stats(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO).
+    wrap_rpc(emqx_cm_proto_v1:get_chan_stats(ClientId, ChanPid)).
 
 %% @doc Set channel's stats.
 -spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()).
@@ -368,7 +378,7 @@ do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
             {living, ConnMod, ChanPid, Session}
     end;
 do_takeover_session(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
+    wrap_rpc(emqx_cm_proto_v1:takeover_session(ClientId, ChanPid)).
 
 %% @doc Discard all the sessions identified by the ClientId.
 -spec(discard_session(emqx_types:clientid()) -> ok).
@@ -422,24 +432,20 @@ discard_session(ClientId, ChanPid) ->
 kick_session(ClientId, ChanPid) ->
     kick_session(kick, ClientId, ChanPid).
 
-%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
-kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
+-spec do_kick_session(kick | discard, emqx_types:clientid(), chan_pid()) -> ok.
+do_kick_session(Action, ClientId, ChanPid) ->
     case get_chann_conn_mod(ClientId, ChanPid) of
         undefined ->
             %% already deregistered
             ok;
         ConnMod when is_atom(ConnMod) ->
             ok = kick_or_kill(Action, ConnMod, ChanPid)
-    end;
+    end.
+
+%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
 kick_session(Action, ClientId, ChanPid) ->
-    %% call remote node on the old APIs because we do not know if they have upgraded
-    %% to have kick_session/3
-    Function = case Action of
-                   discard -> discard_session;
-                   kick -> kick_session
-               end,
     try
-        rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK)
+        wrap_rpc(emqx_cm_proto_v1:kick_session(Action, ClientId, ChanPid))
     catch
         Error : Reason ->
             %% This should mostly be RPC failures.
@@ -525,8 +531,8 @@ lookup_client({clientid, ClientId}) ->
           , Rec <- ets:lookup(emqx_channel_info, Key)].
 
 %% @private
-rpc_call(Node, Fun, Args, Timeout) ->
-    case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
+wrap_rpc(Result) ->
+    case Result of
         {badrpc, Reason} ->
             %% since emqx app 4.3.10, the 'kick' and 'discard' calls handler
             %% should catch all exceptions and always return 'ok'.
@@ -599,14 +605,17 @@ update_stats({Tab, Stat, MaxStat}) ->
         Size -> emqx_stats:setstat(Stat, MaxStat, Size)
     end.
 
-get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
+-spec do_get_chann_conn_mod(emqx_types:clientid(), chan_pid()) ->
+          module() | undefined.
+do_get_chann_conn_mod(ClientId, ChanPid) ->
     Chan = {ClientId, ChanPid},
     try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod
     catch
         error:badarg -> undefined
-    end;
+    end.
+
 get_chann_conn_mod(ClientId, ChanPid) ->
-    rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).
+    wrap_rpc(emqx_cm_proto_v1:get_chann_conn_mod(ClientId, ChanPid)).
 
 mark_channel_connected(ChanPid) ->
     ?tp(emqx_cm_connected_client_count_inc, #{}),

+ 23 - 0
apps/emqx/src/emqx_cm.hrl

@@ -0,0 +1,23 @@
+%%-------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-ifndef(EMQX_CM_HRL).
+-define(EMQX_CM_HRL, true).
+
+-define(T_KICK, 5_000).
+-define(T_GET_INFO, 5_000).
+-define(T_TAKEOVER, 15_000).
+
+-endif.

+ 0 - 12
apps/emqx/src/proto/emqx_broker_proto_v1.erl

@@ -25,9 +25,6 @@
         , list_client_subscriptions/2
         , list_subscriptions_via_topic/2
 
-        , lookup_client/2
-        , kickout_client/2
-
         , start_listener/2
         , stop_listener/2
         , restart_listener/2
@@ -48,15 +45,6 @@ forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
 forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
     emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
 
--spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
-kickout_client(Node, ClientId) ->
-    rpc:call(Node, emqx_cm, kick_session, [ClientId]).
-
--spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
-          [emqx_cm:channel_info()] | {badrpc, _}.
-lookup_client(Node, Key) ->
-    rpc:call(Node, emqx_cm, lookup_client, [Key]).
-
 -spec list_client_subscriptions(node(), emqx_types:clientid()) ->
                 [{emqx_types:topic(), emqx_types:subopts()}]
               | emqx_rpc:badrpc().

+ 71 - 0
apps/emqx/src/proto/emqx_cm_proto_v1.erl

@@ -0,0 +1,71 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_cm_proto_v1).
+
+-behaviour(emqx_bpapi).
+
+-export([ introduced_in/0
+
+        , lookup_client/2
+        , kickout_client/2
+
+        , get_chan_stats/2
+        , get_chan_info/2
+        , get_chann_conn_mod/2
+
+        , takeover_session/2
+        , kick_session/3
+        ]).
+
+-include("bpapi.hrl").
+-include("emqx_cm.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
+kickout_client(Node, ClientId) ->
+    rpc:call(Node, emqx_cm, kick_session, [ClientId]).
+
+-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
+          [emqx_cm:channel_info()] | {badrpc, _}.
+lookup_client(Node, Key) ->
+    rpc:call(Node, emqx_cm, lookup_client, [Key]).
+
+-spec get_chan_stats(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:stats() | {badrpc, _}.
+get_chan_stats(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO * 2).
+
+-spec get_chan_info(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:infos() | {badrpc, _}.
+get_chan_info(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2).
+
+-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) -> module() | undefined | {badrpc, _}.
+get_chann_conn_mod(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2).
+
+-spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) ->
+                none
+              | {expired | persistent, emqx_session:session()}
+              | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()}
+              | {badrpc, _}.
+takeover_session(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2).
+
+-spec kick_session(kick | discard, emqx_types:clientid(), emqx_cm:chan_pid()) -> ok | {badrpc, _}.
+kick_session(Action, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_kick_session, [Action, ClientId, ChanPid], ?T_KICK * 2).

+ 6 - 1
apps/emqx/test/emqx_bpapi_static_checks.erl

@@ -52,6 +52,11 @@
 -define(RPC_FUNCTIONS, "emqx_cluster_rpc:multicall/3, emqx_cluster_rpc:multicall/5").
 %% List of functions in the RPC backend modules that we can ignore:
 -define(IGNORED_RPC_CALLS, "gen_rpc:nodes/0").
+%% List of business-layer functions that are exempt from the checks:
+-define(EXEMPTIONS, "emqx_mgmt_api:do_query/6"  % Reason: legacy code. A fun and a QC query are
+                                                % passed in the args, it's futile to try to statically
+                                                % check it
+       ).
 
 -define(XREF, myxref).
 
@@ -207,7 +212,7 @@ prepare(#{reldir := RelDir, plt := PLT}) ->
     dialyzer_plt:from_file(PLT).
 
 find_remote_calls(_Opts) ->
-    Query = "XC | (A - [" ?IGNORED_APPS "]:App - [" ?IGNORED_MODULES "] : Mod)
+    Query = "XC | (A - [" ?IGNORED_APPS "]:App - [" ?IGNORED_MODULES "]:Mod - [" ?EXEMPTIONS "])
                || (([" ?RPC_MODULES "] : Mod + [" ?RPC_FUNCTIONS "]) - " ?IGNORED_RPC_CALLS ")",
     {ok, Calls} = xref:q(?XREF, Query),
     ?INFO("Calls to RPC modules ~p", [Calls]),

+ 3 - 5
apps/emqx/test/emqx_cm_SUITE.erl

@@ -283,6 +283,7 @@ flush_emqx_pool() ->
 t_discard_session_race(_) ->
     ClientId = rand_client_id(),
     ?check_trace(
+       #{timetrap => 60000},
        begin
          #{conninfo := ConnInfo0} = ?ChanInfo,
          ConnInfo = ConnInfo0#{conn_mod := emqx_ws_connection},
@@ -290,12 +291,9 @@ t_discard_session_race(_) ->
          ok = emqx_cm:register_channel(ClientId, Pid, ConnInfo),
          Pid ! stop,
          receive {'DOWN', Ref, process, Pid, normal} -> ok end,
-         ok = emqx_cm:discard_session(ClientId),
-         {ok, _} = ?block_until(#{?snk_kind := "session_already_gone", pid := Pid}, 1000)
+         ?assertMatch(ok, emqx_cm:discard_session(ClientId))
        end,
-       fun(_, _) ->
-               true
-       end).
+       []).
 
 t_takeover_session(_) ->
     #{conninfo := ConnInfo} = ?ChanInfo,

+ 2 - 2
apps/emqx_management/src/emqx_mgmt.erl

@@ -243,7 +243,7 @@ lookup_client({username, Username}, FormatFun) ->
                   || Node <- mria_mnesia:running_nodes()]).
 
 lookup_client(Node, Key, {M, F}) ->
-    case wrap_rpc(emqx_broker_proto_v1:lookup_client(Node, Key)) of
+    case wrap_rpc(emqx_cm_proto_v1:lookup_client(Node, Key)) of
         {error, Err} -> {error, Err};
         L            -> lists:map(fun({Chan, Info0, Stats}) ->
                                           Info = Info0#{node => Node},
@@ -262,7 +262,7 @@ kickout_client({ClientID, FormatFun}) ->
     end.
 
 kickout_client(Node, ClientId) ->
-    wrap_rpc(emqx_broker_proto_v1:kickout_client(Node, ClientId)).
+    wrap_rpc(emqx_cm_proto_v1:kickout_client(Node, ClientId)).
 
 list_authz_cache(ClientId) ->
     call_client(ClientId, list_authz_cache).

+ 6 - 10
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -197,12 +197,15 @@ do_cluster_query([Node | Tail] = Nodes, Tab, Qs, QueryFun, Continuation,
 %% Do Query (or rpc query)
 %%--------------------------------------------------------------------
 
-%% @private
+%% @private This function is exempt from BPAPI
 do_query(Node, Tab, Qs, {M,F}, Continuation, Limit) when Node =:= node() ->
     erlang:apply(M, F, [Tab, Qs, Continuation, Limit]);
 do_query(Node, Tab, Qs, QueryFun, Continuation, Limit) ->
-    rpc_call(Node, ?MODULE, do_query,
-             [Node, Tab, Qs, QueryFun, Continuation, Limit], 50000).
+    case rpc:call(Node, ?MODULE, do_query,
+                  [Node, Tab, Qs, QueryFun, Continuation, Limit], 50000) of
+        {badrpc, _} = R -> {error, R};
+        Ret -> Ret
+    end.
 
 sub_query_result(Len, Rows, Limit, Results, Meta) ->
     {Flag, NMeta} = judge_page_with_counting(Len, Meta),
@@ -219,13 +222,6 @@ sub_query_result(Len, Rows, Limit, Results, Meta) ->
         end,
     {NMeta, NResults}.
 
-%% @private
-rpc_call(Node, M, F, A, T) ->
-    case rpc:call(Node, M, F, A, T) of
-        {badrpc, _} = R -> {error, R};
-        Res -> Res
-    end.
-
 %%--------------------------------------------------------------------
 %% Table Select
 %%--------------------------------------------------------------------