Przeglądaj źródła

chore(rule_engine): Add an RLOG shard

k32 4 lat temu
rodzic
commit
73ec8c47cc

+ 3 - 0
apps/emqx/include/emqx.hrl

@@ -89,6 +89,9 @@
 
 -define(ROUTE_SHARD, route_shard).
 
+
+-define(RULE_ENGINE_SHARD, emqx_rule_engine_shard).
+
 -record(route, {
           topic :: binary(),
           dest  :: node() | {binary(), node()}

+ 5 - 1
apps/emqx/src/emqx_app.erl

@@ -28,7 +28,11 @@
 
 -define(APP, emqx).
 
--define(EMQX_SHARDS, [?ROUTE_SHARD, ?COMMON_SHARD, ?SHARED_SUB_SHARD]).
+-define(EMQX_SHARDS, [ ?ROUTE_SHARD
+                     , ?COMMON_SHARD
+                     , ?SHARED_SUB_SHARD
+                     , ?RULE_ENGINE_SHARD
+                     ]).
 
 -include("emqx_release.hrl").
 

+ 16 - 3
apps/emqx_rule_engine/src/emqx_rule_registry.erl

@@ -19,6 +19,7 @@
 -behaviour(gen_server).
 
 -include("rule_engine.hrl").
+-include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 
@@ -95,6 +96,11 @@
 
 -define(T_CALL, 10000).
 
+-rlog_shard({?RULE_ENGINE_SHARD, ?RULE_TAB}).
+-rlog_shard({?RULE_ENGINE_SHARD, ?ACTION_TAB}).
+-rlog_shard({?RULE_ENGINE_SHARD, ?RES_TAB}).
+-rlog_shard({?RULE_ENGINE_SHARD, ?RES_TYPE_TAB}).
+
 %%------------------------------------------------------------------------------
 %% Mnesia bootstrap
 %%------------------------------------------------------------------------------
@@ -174,7 +180,7 @@ get_rules_ordered_by_ts() ->
         Query = qlc:q([E || E <- mnesia:table(?RULE_TAB)]),
         qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}]))
     end,
-    {atomic, List} = mnesia:transaction(F),
+    {atomic, List} = ekka_mnesia:transaction(?RULE_ENGINE_SHARD, F),
     List.
 
 -spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
@@ -471,11 +477,18 @@ code_change(_OldVsn, State, _Extra) ->
 
 get_all_records(Tab) ->
     %mnesia:dirty_match_object(Tab, mnesia:table_info(Tab, wild_pattern)).
-    ets:tab2list(Tab).
+    %% Wrapping ets to a r/o transaction to avoid reading inconsistent
+    %% data during shard bootstrap
+    {atomic, Ret} =
+        ekka_mnesia:ro_transaction(?RULE_ENGINE_SHARD,
+                                   fun() ->
+                                           ets:tab2list(Tab)
+                                   end),
+    Ret.
 
 trans(Fun) -> trans(Fun, []).
 trans(Fun, Args) ->
-    case mnesia:transaction(Fun, Args) of
+    case ekka_mnesia:transaction(?RULE_ENGINE_SHARD, Fun, Args) of
         {atomic, Result} -> Result;
         {aborted, Reason} -> error(Reason)
     end.