k32 4 лет назад
Родитель
Сommit
ce4800e6ae
2 измененных файлов с 21 добавлено и 12 удалено
  1. 19 10
      apps/emqx/src/emqx_alarm.erl
  2. 2 2
      apps/emqx_sn/src/emqx_sn_registry.erl

+ 19 - 10
apps/emqx/src/emqx_alarm.erl

@@ -90,6 +90,10 @@
 
 -define(DEACTIVATED_ALARM, emqx_deactivated_alarm).
 
+-rlog_shard({?COMMON_SHARD, ?ACTIVATED_ALARM}).
+-rlog_shard({?COMMON_SHARD, ?DEACTIVATED_ALARM}).
+
+
 -ifdef(TEST).
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -182,7 +186,7 @@ handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Act
                                      details = Details,
                                      message = normalize_message(Name, Details),
                                      activate_at = erlang:system_time(microsecond)},
-            mnesia:dirty_write(?ACTIVATED_ALARM, Alarm),
+            ekka_mnesia:dirty_write(?ACTIVATED_ALARM, Alarm),
             do_actions(activate, Alarm, Actions),
             {reply, ok, State}
     end;
@@ -202,9 +206,14 @@ handle_call(delete_all_deactivated_alarms, _From, State) ->
     {reply, ok, State};
 
 handle_call({get_alarms, all}, _From, State) ->
-    Alarms = [normalize(Alarm) ||
-              Alarm <- ets:tab2list(?ACTIVATED_ALARM)
-                    ++ ets:tab2list(?DEACTIVATED_ALARM)],
+    {atomic, Alarms} =
+        ekka_mnesia:ro_transaction(
+          ?COMMON_SHARD,
+          fun() ->
+                  [normalize(Alarm) ||
+                      Alarm <- ets:tab2list(?ACTIVATED_ALARM)
+                          ++ ets:tab2list(?DEACTIVATED_ALARM)]
+          end),
     {reply, Alarms, State};
 
 handle_call({get_alarms, activated}, _From, State) ->
@@ -252,7 +261,7 @@ deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{
             case mnesia:dirty_first(?DEACTIVATED_ALARM) of
                 '$end_of_table' -> ok;
                 ActivateAt2 ->
-                    mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2)
+                    ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2)
             end;
         false -> ok
     end,
@@ -261,8 +270,8 @@ deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{
     DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details,
                     normalize_message(Name, Details),
                     erlang:system_time(microsecond)),
-    mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
-    mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
+    ekka_mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
+    ekka_mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
     do_actions(deactivate, DeActAlarm, Actions).
 
 make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) ->
@@ -279,7 +288,7 @@ deactivate_all_alarms() ->
                              details = Details,
                              message = Message,
                              activate_at = ActivateAt}) ->
-            mnesia:dirty_write(?DEACTIVATED_ALARM,
+            ekka_mnesia:dirty_write(?DEACTIVATED_ALARM,
                 #deactivated_alarm{
                     activate_at = ActivateAt,
                     name = Name,
@@ -291,7 +300,7 @@ deactivate_all_alarms() ->
 
 %% Delete all records from the given table, ignore result.
 clear_table(TableName) ->
-    case mnesia:clear_table(TableName) of
+    case ekka_mnesia:clear_table(TableName) of
         {aborted, Reason} ->
             ?LOG(warning, "Faile to clear table ~p reason: ~p",
                  [TableName, Reason]);
@@ -311,7 +320,7 @@ delete_expired_deactivated_alarms('$end_of_table', _Checkpoint) ->
 delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) ->
     case ActivatedAt =< Checkpoint of
         true ->
-            mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt),
+            ekka_mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt),
             NActivatedAt = mnesia:dirty_next(?DEACTIVATED_ALARM, ActivatedAt),
             delete_expired_deactivated_alarms(NActivatedAt, Checkpoint);
         false ->

+ 2 - 2
apps/emqx_sn/src/emqx_sn_registry.erl

@@ -77,7 +77,7 @@ mnesia(copy) ->
 
 -spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}).
 start_link(PredefTopics) ->
-    ekka_mnesia:wait_for_shards([?SN_SHARD], infinity),
+    ekka_rlog:wait_for_shards([?SN_SHARD], infinity),
     gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []).
 
 -spec(stop() -> ok).
@@ -172,7 +172,7 @@ handle_call({register, ClientId, TopicName}, _From,
 
 handle_call({unregister, ClientId}, _From, State) ->
     Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}),
-    lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(R) end, Registry),
+    lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Registry),
     {reply, ok, State};
 
 handle_call(Req, _From, State) ->