Browse Source

feat(ft): add basic hooks

Ilya Averyanov 3 years ago
parent
commit
d976943f99
4 changed files with 122 additions and 17 deletions
  1. 29 17
      apps/emqx/src/emqx_cm.erl
  2. 4 0
      apps/emqx/src/emqx_types.erl
  3. 88 0
      apps/emqx/src/proto/emqx_cm_proto_v2.erl
  4. 1 0
      mix.exs

+ 29 - 17
apps/emqx/src/emqx_cm.erl

@@ -89,6 +89,7 @@
     mark_channel_connected/1,
     mark_channel_disconnected/1,
     get_connected_client_count/0,
+    takeover_finish/2,
 
     do_kick_session/3,
     do_get_chan_stats/2,
@@ -171,6 +172,7 @@ register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid)
     true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}),
     ok = emqx_cm_registry:register_channel(Chan),
     mark_channel_connected(ChanPid),
+    ok = emqx_hooks:run('channel.registered', [ConnMod, ChanPid]),
     cast({registered, Chan}).
 
 %% @doc Unregister a channel.
@@ -180,11 +182,13 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
     ok.
 
 %% @private
-do_unregister_channel(Chan) ->
+do_unregister_channel({_ClientId, ChanPid} = Chan) ->
     ok = emqx_cm_registry:unregister_channel(Chan),
     true = ets:delete(?CHAN_CONN_TAB, Chan),
     true = ets:delete(?CHAN_INFO_TAB, Chan),
-    ets:delete_object(?CHAN_TAB, Chan).
+    ets:delete_object(?CHAN_TAB, Chan),
+    ok = emqx_hooks:run('channel.unregistered', [ChanPid]),
+    true.
 
 -spec connection_closed(emqx_types:clientid()) -> true.
 connection_closed(ClientId) ->
@@ -212,7 +216,7 @@ do_get_chan_info(ClientId, ChanPid) ->
 -spec get_chan_info(emqx_types:clientid(), chan_pid()) ->
     maybe(emqx_types:infos()).
 get_chan_info(ClientId, ChanPid) ->
-    wrap_rpc(emqx_cm_proto_v1:get_chan_info(ClientId, ChanPid)).
+    wrap_rpc(emqx_cm_proto_v2:get_chan_info(ClientId, ChanPid)).
 
 %% @doc Update infos of the channel.
 -spec set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean().
@@ -242,7 +246,7 @@ do_get_chan_stats(ClientId, ChanPid) ->
 -spec get_chan_stats(emqx_types:clientid(), chan_pid()) ->
     maybe(emqx_types:stats()).
 get_chan_stats(ClientId, ChanPid) ->
-    wrap_rpc(emqx_cm_proto_v1:get_chan_stats(ClientId, ChanPid)).
+    wrap_rpc(emqx_cm_proto_v2:get_chan_stats(ClientId, ChanPid)).
 
 %% @doc Set channel's stats.
 -spec set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean().
@@ -278,7 +282,7 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
         {ok, #{session => Session1, present => false}}
     end,
     emqx_cm_locker:trans(ClientId, CleanStart);
-open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
+open_session(false, ClientInfo = #{clientid := ClientId}, #{conn_mod := NewConnMod} = ConnInfo) ->
     Self = self(),
     ResumeStart = fun(_) ->
         CreateSess =
@@ -304,18 +308,12 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
                 }};
             {living, ConnMod, ChanPid, Session} ->
                 ok = emqx_session:resume(ClientInfo, Session),
-                case
-                    request_stepdown(
-                        {takeover, 'end'},
-                        ConnMod,
-                        ChanPid
-                    )
-                of
-                    {ok, Pendings} ->
+                case wrap_rpc(emqx_cm_proto_v2:takeover_finish(ConnMod, ChanPid)) of
+                    {ok, Pendings, TakoverData} ->
                         Session1 = emqx_persistent_session:persist(
                             ClientInfo, ConnInfo, Session
                         ),
-                        register_channel(ClientId, Self, ConnInfo),
+                        ok = emqx_hooks:run('channel.takeovered', [NewConnMod, Self, TakoverData]),
                         {ok, #{
                             session => clean_session(Session1),
                             present => true,
@@ -400,6 +398,20 @@ takeover_session(ClientId) ->
             takeover_session(ClientId, ChanPid)
     end.
 
+takeover_finish(ConnMod, ChanPid) ->
+    TakoverData = emqx_hooks:run_fold('channel.takeover', [ConnMod, ChanPid], #{}),
+    case
+        %% node-local call
+        request_stepdown(
+            {takeover, 'end'},
+            ConnMod,
+            ChanPid
+        )
+    of
+        {ok, Pendings} -> {ok, Pendings, TakoverData};
+        {error, _} = Error -> Error
+    end.
+
 takeover_session(ClientId, Pid) ->
     try
         do_takeover_session(ClientId, Pid)
@@ -429,7 +441,7 @@ do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
             end
     end;
 do_takeover_session(ClientId, ChanPid) ->
-    wrap_rpc(emqx_cm_proto_v1:takeover_session(ClientId, ChanPid)).
+    wrap_rpc(emqx_cm_proto_v2:takeover_session(ClientId, ChanPid)).
 
 %% @doc Discard all the sessions identified by the ClientId.
 -spec discard_session(emqx_types:clientid()) -> ok.
@@ -531,7 +543,7 @@ do_kick_session(Action, ClientId, ChanPid) ->
 %% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
 kick_session(Action, ClientId, ChanPid) ->
     try
-        wrap_rpc(emqx_cm_proto_v1:kick_session(Action, ClientId, ChanPid))
+        wrap_rpc(emqx_cm_proto_v2:kick_session(Action, ClientId, ChanPid))
     catch
         Error:Reason ->
             %% This should mostly be RPC failures.
@@ -716,7 +728,7 @@ do_get_chann_conn_mod(ClientId, ChanPid) ->
     end.
 
 get_chann_conn_mod(ClientId, ChanPid) ->
-    wrap_rpc(emqx_cm_proto_v1:get_chann_conn_mod(ClientId, ChanPid)).
+    wrap_rpc(emqx_cm_proto_v2:get_chann_conn_mod(ClientId, ChanPid)).
 
 mark_channel_connected(ChanPid) ->
     ?tp(emqx_cm_connected_client_count_inc, #{}),

+ 4 - 0
apps/emqx/src/emqx_types.erl

@@ -101,6 +101,8 @@
 
 -export_type([oom_policy/0]).
 
+-export_type([takeover_data/0]).
+
 -type proto_ver() ::
     ?MQTT_PROTO_V3
     | ?MQTT_PROTO_V4
@@ -242,3 +244,5 @@
     max_heap_size => non_neg_integer(),
     enable => boolean()
 }.
+
+-type takeover_data() :: map().

+ 88 - 0
apps/emqx/src/proto/emqx_cm_proto_v2.erl

@@ -0,0 +1,88 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_cm_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    lookup_client/2,
+    kickout_client/2,
+
+    get_chan_stats/2,
+    get_chan_info/2,
+    get_chann_conn_mod/2,
+
+    takeover_session/2,
+    takeover_finish/2,
+    kick_session/3
+]).
+
+-include("bpapi.hrl").
+-include("src/emqx_cm.hrl").
+
+introduced_in() ->
+    "5.0.0".
+
+-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
+kickout_client(Node, ClientId) ->
+    rpc:call(Node, emqx_cm, kick_session, [ClientId]).
+
+-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
+    [emqx_cm:channel_info()] | {badrpc, _}.
+lookup_client(Node, Key) ->
+    rpc:call(Node, emqx_cm, lookup_client, [Key]).
+
+-spec get_chan_stats(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:stats() | {badrpc, _}.
+get_chan_stats(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO * 2).
+
+-spec get_chan_info(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:infos() | {badrpc, _}.
+get_chan_info(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2).
+
+-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) ->
+    module() | undefined | {badrpc, _}.
+get_chann_conn_mod(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2).
+
+-spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) ->
+    none
+    | {expired | persistent, emqx_session:session()}
+    | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()}
+    | {badrpc, _}.
+takeover_session(ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2).
+
+-spec takeover_finish(module(), emqx_cm:chan_pid()) ->
+    {ok, emqx_type:takeover_data()}
+    | {ok, list(emqx_type:deliver()), emqx_type:takeover_data()}
+    | {error, term()}
+    | {badrpc, _}.
+takeover_finish(ConnMod, ChanPid) ->
+    erpc:call(
+        node(ChanPid),
+        emqx_cm,
+        takeover_session_finish,
+        [ConnMod, ChanPid],
+        ?T_TAKEOVER * 2
+    ).
+
+-spec kick_session(kick | discard, emqx_types:clientid(), emqx_cm:chan_pid()) -> ok | {badrpc, _}.
+kick_session(Action, ClientId, ChanPid) ->
+    rpc:call(node(ChanPid), emqx_cm, do_kick_session, [Action, ClientId, ChanPid], ?T_KICK * 2).

+ 1 - 0
mix.exs

@@ -293,6 +293,7 @@ defmodule EMQXUmbrella.MixProject do
         emqx_psk: :permanent,
         emqx_slow_subs: :permanent,
         emqx_plugins: :permanent,
+        emqx_ft: :permanent,
         emqx_mix: :none
       ] ++
       if(enable_quicer?(), do: [quicer: :permanent], else: []) ++