Jelajahi Sumber

fix(exclusive): clean-up exclusive subscriptions when a node is down

firest 1 tahun lalu
induk
melakukan
5f94576742

+ 12 - 5
apps/emqx/src/emqx_broker_sup.erl

@@ -23,10 +23,7 @@
 -export([init/1]).
 
 start_link() ->
-    ok = mria:wait_for_tables(
-        emqx_shared_sub:create_tables() ++
-            emqx_exclusive_subscription:create_tables()
-    ),
+    ok = mria:wait_for_tables(emqx_shared_sub:create_tables()),
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 %%--------------------------------------------------------------------
@@ -70,4 +67,14 @@ init([]) ->
         modules => [emqx_broker_helper]
     },
 
-    {ok, {{one_for_all, 0, 1}, [SyncerPool, BrokerPool, SharedSub, Helper]}}.
+    %% exclusive subscription
+    ExclusiveSub = #{
+        id => exclusive_subscription,
+        start => {emqx_exclusive_subscription, start_link, []},
+        restart => permanent,
+        shutdown => 2000,
+        type => worker,
+        modules => [emqx_exclusive_subscription]
+    },
+
+    {ok, {{one_for_all, 0, 1}, [SyncerPool, BrokerPool, SharedSub, Helper, ExclusiveSub]}}.

+ 118 - 23
apps/emqx/src/emqx_exclusive_subscription.erl

@@ -16,16 +16,24 @@
 
 -module(emqx_exclusive_subscription).
 
+-behaviour(gen_server).
+
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
 
--logger_header("[exclusive]").
-
-%% Mnesia bootstrap
--export([create_tables/0]).
+%% API
+-export([start_link/0]).
 
-%% For upgrade
--export([on_add_module/0, on_delete_module/0]).
+%% gen_server callbacks
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2
+]).
 
 -export([
     check_subscribe/2,
@@ -36,12 +44,15 @@
 
 %% Internal exports (RPC)
 -export([
-    try_subscribe/2
+    try_subscribe/3,
+    do_cleanup_subscriptions/1
 ]).
 
 -record(exclusive_subscription, {
     topic :: emqx_types:topic(),
-    clientid :: emqx_types:clientid()
+    clientid :: emqx_types:clientid(),
+    node :: node(),
+    extra = #{} :: map()
 }).
 
 -define(TAB, emqx_exclusive_subscription).
@@ -51,40 +62,60 @@
 %% Mnesia bootstrap
 %%--------------------------------------------------------------------
 
-create_tables() ->
+init_tables() ->
     StoreProps = [
         {ets, [
             {read_concurrency, true},
             {write_concurrency, true}
         ]}
     ],
+
+    Fields = record_info(fields, exclusive_subscription),
+
     ok = mria:create_table(?TAB, [
         {rlog_shard, ?EXCLUSIVE_SHARD},
         {type, set},
         {storage, ram_copies},
         {record_name, exclusive_subscription},
-        {attributes, record_info(fields, exclusive_subscription)},
+        {attributes, Fields},
         {storage_properties, StoreProps}
     ]),
-    [?TAB].
+    ok = mria:wait_for_tables([?TAB]),
+    try_upgrade_table(Fields).
+
+try_upgrade_table(Fields) ->
+    case mnesia:table_info(?TAB, attributes) =:= Fields of
+        true ->
+            ok;
+        false ->
+            TransFun = fun
+                ({exclusive_subscription, Topic, ClientId}) ->
+                    #exclusive_subscription{
+                        topic = Topic,
+                        clientid = ClientId,
+                        node = undefined,
+                        extra = #{}
+                    };
+                (Data = #exclusive_subscription{}) ->
+                    Data
+            end,
+            {atomic, ok} = mnesia:transform_table(?TAB, TransFun, Fields),
+            ok
+    end.
 
 %%--------------------------------------------------------------------
-%% Upgrade
+%% APIs
 %%--------------------------------------------------------------------
 
-on_add_module() ->
-    mria:wait_for_tables(create_tables()).
-
-on_delete_module() ->
-    clear().
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
-%%--------------------------------------------------------------------
-%% APIs
-%%--------------------------------------------------------------------
 -spec check_subscribe(emqx_types:clientinfo(), emqx_types:topic()) ->
     allow | deny.
 check_subscribe(#{clientid := ClientId}, Topic) ->
-    case mria:transaction(?EXCLUSIVE_SHARD, fun ?MODULE:try_subscribe/2, [ClientId, Topic]) of
+    case
+        mria:transaction(?EXCLUSIVE_SHARD, fun ?MODULE:try_subscribe/3, [ClientId, Topic, node()])
+    of
         {atomic, Res} ->
             Res;
         {aborted, Reason} ->
@@ -111,17 +142,60 @@ dirty_lookup_clientid(Topic) ->
 clear() ->
     mria:clear_table(?TAB).
 
+%%--------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------
+
+init([]) ->
+    process_flag(trap_exit, true),
+    init_tables(),
+    ok = ekka:monitor(membership),
+    {ok, #{}}.
+
+handle_call(Request, From, State) ->
+    ?SLOG(warning, #{
+        msg => "unexpected_call",
+        call => Request,
+        from => From
+    }),
+    {reply, ok, State}.
+
+handle_cast(Request, State) ->
+    ?SLOG(warning, #{
+        msg => "unexpected_cast",
+        cast => Request
+    }),
+    {noreply, State}.
+
+handle_info({membership, {mnesia, down, Node}}, State) ->
+    cleanup_subscriptions(Node),
+    {noreply, State};
+handle_info({membership, {node, down, Node}}, State) ->
+    cleanup_subscriptions(Node),
+    {noreply, State};
+handle_info(Info, State) ->
+    ?SLOG(warning, #{
+        msg => "unexpected_info",
+        info => Info
+    }),
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok = ekka:unmonitor(membership).
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
-try_subscribe(ClientId, Topic) ->
+
+try_subscribe(ClientId, Topic, Node) ->
     case mnesia:wread({?TAB, Topic}) of
         [] ->
             mnesia:write(
                 ?TAB,
                 #exclusive_subscription{
                     clientid = ClientId,
-                    topic = Topic
+                    topic = Topic,
+                    node = Node
                 },
                 write
             ),
@@ -136,3 +210,24 @@ try_subscribe(ClientId, Topic) ->
         [_] ->
             deny
     end.
+
+cleanup_subscriptions(Node) ->
+    global:trans(
+        {{?MODULE, ?FUNCTION_NAME}, self()},
+        fun() ->
+            mria:transaction(?EXCLUSIVE_SHARD, fun ?MODULE:do_cleanup_subscriptions/1, [Node])
+        end
+    ).
+
+do_cleanup_subscriptions(Node0) ->
+    Spec = ets:fun2ms(fun(#exclusive_subscription{node = Node} = Data) when
+        Node0 =:= Node
+    ->
+        Data
+    end),
+    lists:foreach(
+        fun(Obj) ->
+            mnesia:delete_object(?TAB, Obj, write)
+        end,
+        mnesia:select(?TAB, Spec, write)
+    ).

+ 2 - 0
changes/ce/fix-13702.en.md

@@ -0,0 +1,2 @@
+Clean up the corresponding exclusive subscriptions when a node goes down.
+