Jelajahi Sumber

Merge pull request #4711 from qzhuyan/dev/william/fix-issue-2671-c2

Dev/william/fix issue 2671 c2
William Yang 5 tahun lalu
induk
melakukan
3c103ae546
3 mengubah file dengan 26 tambahan dan 7 penghapusan
  1. 2 2
      etc/emqx.conf
  2. 4 2
      priv/emqx.schema
  3. 20 3
      src/emqx_router.erl

+ 2 - 2
etc/emqx.conf

@@ -2268,8 +2268,8 @@ broker.shared_dispatch_ack_enabled = false
 ## Value: Flag
 broker.route_batch_clean = off
 
-## Performance toggle for subscribe/unsubscribe wildcard topic
-## change this toggle only when there are many wildcard topics.
+## Performance toggle for subscribe/unsubscribe wildcard topic.
+## Change this toggle only when there are many wildcard topics.
 ## Value: Enum
 ##  - key:   mnesia translational updates with per-key locks. recommended for single node setup.
 ##  - tab:   mnesia translational updates with table lock. recommended for multi-nodes setup.

+ 4 - 2
priv/emqx.schema

@@ -2254,11 +2254,13 @@ end}.
   {datatype, flag}
 ]}.
 
-%% @doc performance toggle for subscribe/unsubscribe wildcard topic
-%%      change this toggle only when there are many wildcard topics.
+%% @doc Performance toggle for subscribe/unsubscribe wildcard topic.
+%%      Change this toggle only when there are many wildcard topics.
 %% key:   mnesia translational updates with per-key locks. recommended for single node setup.
 %% tab:   mnesia translational updates with table lock. recommended for multi-nodes setup.
 %% global: global lock protected updates. recommended for larger cluster.
+%% NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster
+%%
 {mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [
   {default, key},
   {datatype, {enum, [key, tab, global]}}

+ 20 - 3
src/emqx_router.erl

@@ -268,9 +268,26 @@ maybe_trans(Fun, Args) ->
 
 -spec(trans(function(), list(any())) -> ok | {error, term()}).
 trans(Fun, Args) ->
-    case mnesia:transaction(Fun, Args) of
-        {atomic, Ok} -> Ok;
-        {aborted, Reason} -> {error, Reason}
+    %% trigger selective receive optimization of compiler,
+    %% ideal for handling bursty traffic.
+    Ref = erlang:make_ref(),
+    Owner = self(),
+    {WPid, RefMon} = spawn_monitor(
+                       fun() ->
+                               Res = case mnesia:transaction(Fun, Args) of
+                                         {atomic, Ok} -> Ok;
+                                         {aborted, Reason} -> {error, Reason}
+                                     end,
+                               Owner ! {Ref, Res}
+                       end),
+    receive
+        {Ref, TransRes} ->
+            receive
+                {'DOWN', RefMon, process, WPid, normal} -> ok
+            end,
+            TransRes;
+        {'DOWN', RefMon, process, WPid, Info} ->
+            {error, {trans_crash, Info}}
     end.
 
 lock_router() ->