Przeglądaj źródła

Merge pull request #5123 from k32/dev/common_shard

Common EMQX shard
k32 4 lat temu
rodzic
commit
f6507a4d02

+ 3 - 1
apps/emqx/include/emqx.hrl

@@ -23,6 +23,9 @@
 
 -define(Otherwise, true).
 
+-define(COMMON_SHARD, emqx_common_shard).
+-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
+
 %%--------------------------------------------------------------------
 %% Banner
 %%--------------------------------------------------------------------
@@ -134,4 +137,3 @@
         }).
 
 -endif.
-

+ 19 - 10
apps/emqx/src/emqx_alarm.erl

@@ -90,6 +90,10 @@
 
 -define(DEACTIVATED_ALARM, emqx_deactivated_alarm).
 
+-rlog_shard({?COMMON_SHARD, ?ACTIVATED_ALARM}).
+-rlog_shard({?COMMON_SHARD, ?DEACTIVATED_ALARM}).
+
+
 -ifdef(TEST).
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -182,7 +186,7 @@ handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Act
                                      details = Details,
                                      message = normalize_message(Name, Details),
                                      activate_at = erlang:system_time(microsecond)},
-            mnesia:dirty_write(?ACTIVATED_ALARM, Alarm),
+            ekka_mnesia:dirty_write(?ACTIVATED_ALARM, Alarm),
             do_actions(activate, Alarm, Actions),
             {reply, ok, State}
     end;
@@ -202,9 +206,14 @@ handle_call(delete_all_deactivated_alarms, _From, State) ->
     {reply, ok, State};
 
 handle_call({get_alarms, all}, _From, State) ->
-    Alarms = [normalize(Alarm) ||
-              Alarm <- ets:tab2list(?ACTIVATED_ALARM)
-                    ++ ets:tab2list(?DEACTIVATED_ALARM)],
+    {atomic, Alarms} =
+        ekka_mnesia:ro_transaction(
+          ?COMMON_SHARD,
+          fun() ->
+                  [normalize(Alarm) ||
+                      Alarm <- ets:tab2list(?ACTIVATED_ALARM)
+                          ++ ets:tab2list(?DEACTIVATED_ALARM)]
+          end),
     {reply, Alarms, State};
 
 handle_call({get_alarms, activated}, _From, State) ->
@@ -252,7 +261,7 @@ deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{
             case mnesia:dirty_first(?DEACTIVATED_ALARM) of
                 '$end_of_table' -> ok;
                 ActivateAt2 ->
-                    mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2)
+                    ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2)
             end;
         false -> ok
     end,
@@ -261,8 +270,8 @@ deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{
     DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details,
                     normalize_message(Name, Details),
                     erlang:system_time(microsecond)),
-    mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
-    mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
+    ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
+    ekka_mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
     do_actions(deactivate, DeActAlarm, Actions).
 
 make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) ->
@@ -279,7 +288,7 @@ deactivate_all_alarms() ->
                              details = Details,
                              message = Message,
                              activate_at = ActivateAt}) ->
-            mnesia:dirty_write(?DEACTIVATED_ALARM,
+            ekka_mnesia:dirty_write(?DEACTIVATED_ALARM,
                 #deactivated_alarm{
                     activate_at = ActivateAt,
                     name = Name,
@@ -291,7 +300,7 @@ deactivate_all_alarms() ->
 
 %% Delete all records from the given table, ignore result.
 clear_table(TableName) ->
-    case mnesia:clear_table(TableName) of
+    case ekka_mnesia:clear_table(TableName) of
         {aborted, Reason} ->
             ?LOG(warning, "Faile to clear table ~p reason: ~p",
                  [TableName, Reason]);
@@ -311,7 +320,7 @@ delete_expired_deactivated_alarms('$end_of_table', _Checkpoint) ->
 delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) ->
     case ActivatedAt =< Checkpoint of
         true ->
-            mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt),
+            ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt),
             NActivatedAt = mnesia:dirty_next(?DEACTIVATED_ALARM, ActivatedAt),
             delete_expired_deactivated_alarms(NActivatedAt, Checkpoint);
         false ->

+ 1 - 1
apps/emqx/src/emqx_app.erl

@@ -28,7 +28,7 @@
 
 -define(APP, emqx).
 
--define(EMQX_SHARDS, [route_shard]).
+-define(EMQX_SHARDS, [?ROUTE_SHARD, ?COMMON_SHARD, ?SHARED_SUB_SHARD]).
 
 -include("emqx_release.hrl").
 

+ 10 - 9
apps/emqx/src/emqx_banned.erl

@@ -51,6 +51,8 @@
 
 -define(BANNED_TAB, ?MODULE).
 
+-rlog_shard({?COMMON_SHARD, ?BANNED_TAB}).
+
 %%--------------------------------------------------------------------
 %% Mnesia bootstrap
 %%--------------------------------------------------------------------
@@ -96,19 +98,19 @@ create(#{who    := Who,
          reason := Reason,
          at     := At,
          until  := Until}) ->
-    mnesia:dirty_write(?BANNED_TAB, #banned{who = Who,
-                                            by = By,
-                                            reason = Reason,
-                                            at = At,
-                                            until = Until});
+    ekka_mnesia:dirty_write(?BANNED_TAB, #banned{who = Who,
+                                                 by = By,
+                                                 reason = Reason,
+                                                 at = At,
+                                                 until = Until});
 create(Banned) when is_record(Banned, banned) ->
-    mnesia:dirty_write(?BANNED_TAB, Banned).
+    ekka_mnesia:dirty_write(?BANNED_TAB, Banned).
 
 -spec(delete({clientid, emqx_types:clientid()}
            | {username, emqx_types:username()}
            | {peerhost, emqx_types:peerhost()}) -> ok).
 delete(Who) ->
-    mnesia:dirty_delete(?BANNED_TAB, Who).
+    ekka_mnesia:dirty_delete(?BANNED_TAB, Who).
 
 info(InfoKey) ->
     mnesia:table_info(?BANNED_TAB, InfoKey).
@@ -129,7 +131,7 @@ handle_cast(Msg, State) ->
     {noreply, State}.
 
 handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
-    mnesia:async_dirty(fun expire_banned_items/1, [erlang:system_time(second)]),
+    ekka_mnesia:transaction(?COMMON_SHARD, fun expire_banned_items/1, [erlang:system_time(second)]),
     {noreply, ensure_expiry_timer(State), hibernate};
 
 handle_info(Info, State) ->
@@ -160,4 +162,3 @@ expire_banned_items(Now) ->
               mnesia:delete_object(?BANNED_TAB, B, sticky_write);
          (_, _Acc) -> ok
       end, ok, ?BANNED_TAB).
-

+ 6 - 4
apps/emqx/src/emqx_shared_sub.erl

@@ -77,6 +77,8 @@
 -define(NACK(Reason), {shared_sub_nack, Reason}).
 -define(NO_ACK, no_ack).
 
+-rlog_shard({?SHARED_SUB_SHARD, ?TAB}).
+
 -record(state, {pmon}).
 
 -record(emqx_shared_subscription, {group, topic, subpid}).
@@ -297,7 +299,7 @@ subscribers(Group, Topic) ->
 
 init([]) ->
     {ok, _} = mnesia:subscribe({table, ?TAB, simple}),
-    {atomic, PMon} = mnesia:transaction(fun init_monitors/0),
+    {atomic, PMon} = ekka_mnesia:transaction(?SHARED_SUB_SHARD, fun init_monitors/0),
     ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]),
     ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]),
     {ok, update_stats(#state{pmon = PMon})}.
@@ -309,7 +311,7 @@ init_monitors() ->
       end, emqx_pmon:new(), ?TAB).
 
 handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
-    mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)),
+    ekka_mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)),
     case ets:member(?SHARED_SUBS, {Group, Topic}) of
         true  -> ok;
         false -> ok = emqx_router:do_add_route(Topic, {Group, node()})
@@ -319,7 +321,7 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon
     {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
 
 handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
-    mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
+    ekka_mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
     true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
     delete_route_if_needed({Group, Topic}),
     {reply, ok, State};
@@ -373,7 +375,7 @@ cleanup_down(SubPid) ->
     ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid),
     lists:foreach(
         fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) ->
-            ok = mnesia:dirty_delete_object(?TAB, Record),
+            ok = ekka_mnesia:dirty_delete_object(?TAB, Record),
             true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
             delete_route_if_needed({Group, Topic})
         end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).

+ 10 - 6
apps/emqx_sn/src/emqx_sn_registry.erl

@@ -44,6 +44,8 @@
         , code_change/3
         ]).
 
+-define(SN_SHARD, emqx_sn_shard).
+
 -define(TAB, ?MODULE).
 
 -record(state, {max_predef_topic_id = 0}).
@@ -56,6 +58,7 @@
 -boot_mnesia({mnesia, [boot]}).
 -copy_mnesia({mnesia, [copy]}).
 
+-rlog_shard({?SN_SHARD, ?TAB}).
 
 %% @doc Create or replicate tables.
 -spec(mnesia(boot | copy) -> ok).
@@ -74,6 +77,7 @@ mnesia(copy) ->
 
 -spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}).
 start_link(PredefTopics) ->
+    ekka_rlog:wait_for_shards([?SN_SHARD], infinity),
     gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []).
 
 -spec(stop() -> ok).
@@ -129,10 +133,10 @@ init([PredefTopics]) ->
     %% {ClientId, TopicName} -> TopicId
     MaxPredefId = lists:foldl(
                     fun({TopicId, TopicName}, AccId) ->
-                        mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId},
-                                                             value = TopicName}),
-                        mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName},
-                                                             value = TopicId}),
+                        ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId},
+                                                                  value = TopicName}),
+                        ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName},
+                                                                  value = TopicId}),
                         if TopicId > AccId -> TopicId; true -> AccId end
                     end, 0, PredefTopics),
     {ok, #state{max_predef_topic_id = MaxPredefId}}.
@@ -157,7 +161,7 @@ handle_call({register, ClientId, TopicName}, _From,
                         mnesia:write(#emqx_sn_registry{key = {ClientId, TopicId},
                                                             value = TopicName})
                     end,
-                    case mnesia:transaction(Fun) of
+                    case ekka_mnesia:transaction(?SN_SHARD, Fun) of
                         {atomic, ok} ->
                             {reply, TopicId, State};
                         {aborted, Error} ->
@@ -168,7 +172,7 @@ handle_call({register, ClientId, TopicName}, _From,
 
 handle_call({unregister, ClientId}, _From, State) ->
     Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}),
-    lists:foreach(fun(R) -> mnesia:dirty_delete_object(R) end, Registry),
+    lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Registry),
     {reply, ok, State};
 
 handle_call(Req, _From, State) ->

+ 2 - 2
apps/emqx_sn/test/emqx_sn_registry_SUITE.erl

@@ -41,9 +41,10 @@ end_per_suite(_Config) ->
     ok.
 
 init_per_testcase(_TestCase, Config) ->
+    application:set_env(ekka, strict_mode, true),
     ekka_mnesia:start(),
     emqx_sn_registry:mnesia(boot),
-    mnesia:clear_table(emqx_sn_registry),
+    ekka_mnesia:clear_table(emqx_sn_registry),
     PredefTopics = application:get_env(emqx_sn, predefined, []),
     {ok, _Pid} = ?REGISTRY:start_link(PredefTopics),
     Config.
@@ -118,4 +119,3 @@ register_a_lot(N, Max) when N < Max ->
     Topic = iolist_to_binary(["Topic", integer_to_list(N)]),
     ?assertEqual(N, ?REGISTRY:register_topic(<<"ClientId">>, Topic)),
     register_a_lot(N+1, Max).
-

+ 11 - 9
apps/emqx_telemetry/src/emqx_telemetry.erl

@@ -90,6 +90,8 @@
 
 -define(TELEMETRY, emqx_telemetry).
 
+-rlog_shard({?COMMON_SHARD, ?TELEMETRY}).
+
 %%--------------------------------------------------------------------
 %% Mnesia bootstrap
 %%--------------------------------------------------------------------
@@ -146,9 +148,9 @@ init([Opts]) ->
                  [] ->
                      Enabled = proplists:get_value(enabled, Opts, true),
                      UUID = generate_uuid(),
-                     mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
-                                                               uuid = UUID,
-                                                               enabled = Enabled}),
+                     ekka_mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
+                                                                    uuid = UUID,
+                                                                    enabled = Enabled}),
                      State#state{enabled = Enabled, uuid = UUID};
                  [#telemetry{uuid = UUID, enabled = Enabled} | _] ->
                      State#state{enabled = Enabled, uuid = UUID}
@@ -162,16 +164,16 @@ init([Opts]) ->
     end.
 
 handle_call(enable, _From, State = #state{uuid = UUID}) ->
-    mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
-                                              uuid = UUID,
-                                              enabled = true}),
+    ekka_mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
+                                                   uuid = UUID,
+                                                   enabled = true}),
     _ = erlang:send(self(), first_report),
     {reply, ok, State#state{enabled = true}};
 
 handle_call(disable, _From, State = #state{uuid = UUID}) ->
-    mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
-                                              uuid = UUID,
-                                              enabled = false}),
+    ekka_mnesia:dirty_write(?TELEMETRY, #telemetry{id = ?UNIQUE_ID,
+                                                   uuid = UUID,
+                                                   enabled = false}),
     {reply, ok, State#state{enabled = false}};
 
 handle_call(is_enabled, _From, State = #state{enabled = Enabled}) ->