Kaynağa Gözat

Merge pull request #4645 from qzhuyan/dev/william/fix-issue-2985-c3

fix: issue 2985 attempt 3
William Yang 4 yıl önce
ebeveyn
işleme
907502fed0
5 değiştirilmiş dosya ile 59 ekleme ve 4 silme
  1. 10 0
      priv/emqx.schema
  2. 1 1
      src/emqx_broker.erl
  3. 36 2
      src/emqx_router.erl
  4. 4 0
      src/emqx_router_sup.erl
  5. 8 1
      src/emqx_trie.erl

+ 10 - 0
priv/emqx.schema

@@ -2254,6 +2254,16 @@ end}.
   {datatype, flag}
   {datatype, flag}
 ]}.
 ]}.
 
 
+%% @doc performance toggle for subscribe/unsubscribe wildcard topic
+%%      change this toggle only if you have many wildcard topics.
+%% key:   mnesia translational updates with per-key locks. recommended for single node setup.
+%% tab:   mnesia translational updates with table lock. recommended for multi-nodes setup.
+%% global: global lock protected updates. recommended for larger cluster.
+{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [
+  {default, key},
+  {datatype, {enum, [key, tab, global]}}
+]}.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% System Monitor
 %% System Monitor
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------

+ 1 - 1
src/emqx_broker.erl

@@ -419,7 +419,7 @@ safe_update_stats(Tab, Stat, MaxStat) ->
 -compile({inline, [call/2, cast/2, pick/1]}).
 -compile({inline, [call/2, cast/2, pick/1]}).
 
 
 call(Broker, Req) ->
 call(Broker, Req) ->
-    gen_server:call(Broker, Req).
+    gen_server:call(Broker, Req, infinity).
 
 
 cast(Broker, Msg) ->
 cast(Broker, Msg) ->
     gen_server:cast(Broker, Msg).
     gen_server:cast(Broker, Msg).

+ 36 - 2
src/emqx_router.erl

@@ -118,7 +118,8 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
         false ->
         false ->
             ok = emqx_router_helper:monitor(Dest),
             ok = emqx_router_helper:monitor(Dest),
             case emqx_topic:wildcard(Topic) of
             case emqx_topic:wildcard(Topic) of
-                true  -> trans(fun insert_trie_route/1, [Route]);
+                true  ->
+                    maybe_trans(fun insert_trie_route/1, [Route]);
                 false -> insert_direct_route(Route)
                 false -> insert_direct_route(Route)
             end
             end
     end.
     end.
@@ -164,7 +165,8 @@ do_delete_route(Topic) when is_binary(Topic) ->
 do_delete_route(Topic, Dest) ->
 do_delete_route(Topic, Dest) ->
     Route = #route{topic = Topic, dest = Dest},
     Route = #route{topic = Topic, dest = Dest},
     case emqx_topic:wildcard(Topic) of
     case emqx_topic:wildcard(Topic) of
-        true  -> trans(fun delete_trie_route/1, [Route]);
+        true  ->
+            maybe_trans(fun delete_trie_route/1, [Route]);
         false -> delete_direct_route(Route)
         false -> delete_direct_route(Route)
     end.
     end.
 
 
@@ -247,6 +249,24 @@ delete_trie_route(Route = #route{topic = Topic}) ->
     end.
     end.
 
 
 %% @private
 %% @private
+-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
+maybe_trans(Fun, Args) ->
+    case persistent_term:get(emqx_route_lock_type) of
+        key ->
+            trans(Fun, Args);
+        global ->
+            lock_router(),
+            try mnesia:sync_dirty(Fun, Args)
+            after
+                unlock_router()
+            end;
+        tab ->
+            trans(fun() ->
+                          emqx_trie:lock_tables(),
+                          apply(Fun, Args)
+                  end, [])
+    end.
+
 -spec(trans(function(), list(any())) -> ok | {error, term()}).
 -spec(trans(function(), list(any())) -> ok | {error, term()}).
 trans(Fun, Args) ->
 trans(Fun, Args) ->
     case mnesia:transaction(Fun, Args) of
     case mnesia:transaction(Fun, Args) of
@@ -254,3 +274,17 @@ trans(Fun, Args) ->
         {aborted, Reason} -> {error, Reason}
         {aborted, Reason} -> {error, Reason}
     end.
     end.
 
 
+lock_router() ->
+    %% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
+    %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
+    case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
+        false ->
+            %% Force to sleep 1ms instead.
+            timer:sleep(1),
+            lock_router();
+        true ->
+            ok
+    end.
+
+unlock_router() ->
+    global:del_lock({?MODULE, self()}).

+ 4 - 0
src/emqx_router_sup.erl

@@ -34,6 +34,10 @@ init([]) ->
                type     => worker,
                type     => worker,
                modules  => [emqx_router_helper]},
                modules  => [emqx_router_helper]},
 
 
+    ok = persistent_term:put(emqx_route_lock_type,
+                             application:get_env(emqx, route_lock_type, key)
+                            ),
+
     %% Router pool
     %% Router pool
     RouterPool = emqx_pool_sup:spec([router_pool, hash,
     RouterPool = emqx_pool_sup:spec([router_pool, hash,
                                      {emqx_router, start_link, []}]),
                                      {emqx_router, start_link, []}]),

+ 8 - 1
src/emqx_trie.erl

@@ -31,7 +31,9 @@
         , delete/1
         , delete/1
         ]).
         ]).
 
 
--export([empty/0]).
+-export([ empty/0
+        , lock_tables/0
+        ]).
 
 
 -ifdef(TEST).
 -ifdef(TEST).
 -compile(export_all).
 -compile(export_all).
@@ -122,6 +124,11 @@ delete(Topic) when is_binary(Topic) ->
 empty() ->
 empty() ->
     ets:info(?TRIE_TAB, size) == 0.
     ets:info(?TRIE_TAB, size) == 0.
 
 
+-spec lock_tables() -> ok.
+lock_tables() ->
+    mnesia:write_lock_table(?TRIE_TAB),
+    mnesia:write_lock_table(?TRIE_NODE_TAB).
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------