Jelajahi Sumber

fix: monitor bad mnesia write & rpc call (#6060)

DDDHuang 4 tahun lalu
induk
melakukan
1476accd63

+ 2 - 0
apps/emqx_dashboard/include/emqx_dashboard.hrl

@@ -33,6 +33,8 @@
     extra = [] :: term() %% not used so far, fur future extension
     }).
 
+-define(TAB_COLLECT, emqx_collect).
+
 -define(EMPTY_KEY(Key), ((Key == undefined) orelse (Key == <<>>))).
 
 -define(DASHBOARD_SHARD, emqx_dashboard_shard).

+ 16 - 10
apps/emqx_dashboard/src/emqx_dashboard_collection.erl

@@ -40,10 +40,10 @@
 -define(EXPIRE_INTERVAL, 86400000 * 7).
 
 mnesia(boot) ->
-    ok = mria:create_table(emqx_collect, [
+    ok = mria:create_table(?TAB_COLLECT, [
         {type, set},
         {local_content, true},
-        {storage, disc_only_copies},
+        {storage, disc_copies},
         {record_name, mqtt_collect},
         {attributes, record_info(fields, mqtt_collect)}]).
 
@@ -87,7 +87,10 @@ handle_call(_Req, _From, State) ->
 handle_cast(_Req, State) ->
     {noreply, State}.
 
-handle_info(collect, State = #{collect := Collect, count := 1, temp_collect := TempCollect, last_collects := LastCollect}) ->
+handle_info(collect, State = #{ collect := Collect
+                              , count := 1
+                              , temp_collect := TempCollect
+                              , last_collects := LastCollect}) ->
     timer(next_interval(), collect),
     NewLastCollect = flush(collect_all(Collect), LastCollect),
     TempCollect1 = temp_collect(TempCollect),
@@ -107,9 +110,9 @@ handle_info(clear_expire_data, State = #{expire_interval := ExpireInterval}) ->
     timer(?CLEAR_INTERVAL, clear_expire_data),
     T1 = get_local_time(),
     Spec = ets:fun2ms(fun({_, T, _C} = Data) when (T1 - T) > ExpireInterval -> Data end),
-    Collects = dets:select(emqx_collect, Spec),
+    Collects = ets:select(?TAB_COLLECT, Spec),
     lists:foreach(fun(Collect) ->
-        dets:delete_object(emqx_collect, Collect)
+        true = ets:delete_object(?TAB_COLLECT, Collect)
     end, Collects),
     {noreply, State, hibernate};
 
@@ -131,9 +134,9 @@ temp_collect({_, _, Received, Sent}) ->
      Sent1}.
 
 collect_all({Connection, Route, Subscription}) ->
-    {[collect(connections)| Connection],
-     [collect(routes)| Route],
-     [collect(subscriptions)| Subscription]}.
+    {[collect(connections) | Connection],
+     [collect(routes) | Route],
+     [collect(subscriptions) | Subscription]}.
 
 collect(connections) ->
     emqx_stats:getstat('connections.count');
@@ -159,8 +162,11 @@ flush({Connection, Route, Subscription}, {Received0, Sent0, Dropped0}) ->
                diff(Sent, Sent0),
                diff(Dropped, Dropped0)},
     Ts = get_local_time(),
-    _ = mria:transaction(mria:local_content_shard(),
-                         fun mnesia:write/1, [#mqtt_collect{timestamp = Ts, collect = Collect}]),
+    {atomic, ok} = mria:transaction(mria:local_content_shard(),
+                                    fun mnesia:write/3,
+                                    [ ?TAB_COLLECT
+                                    , #mqtt_collect{timestamp = Ts, collect = Collect}
+                                    , write]),
     {Received, Sent, Dropped}.
 
 avg(Items) ->

+ 14 - 8
apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl

@@ -18,6 +18,10 @@
         , current_counters/2
         ]).
 
+-export([ sampling/1
+        , sampling/2
+        ]).
+
 -define(COUNTERS, [ connection
                   , route
                   , subscriptions
@@ -174,8 +178,10 @@ format_current_metrics(Collects) ->
     format_current_metrics(Collects, {0,0,0,0}).
 format_current_metrics([], Acc) ->
     Acc;
-format_current_metrics([{Received, Sent, Sub, Conn} | Collects], {Received1, Sent1, Sub1, Conn1}) ->
-    format_current_metrics(Collects, {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}).
+format_current_metrics([{Received, Sent, Sub, Conn} | Collects],
+        {Received1, Sent1, Sub1, Conn1}) ->
+    format_current_metrics(Collects,
+        {Received1 + Received, Sent1 + Sent, Sub1 + Sub, Conn1 + Conn}).
 
 
 %%%==============================================================================================
@@ -260,19 +266,19 @@ key_replace([Term | List], All, Comparison, Default) ->
     end.
 
 sampling(Node) when Node =:= node() ->
-    Time = emqx_dashboard_collection:get_local_time() - 7200000,
-    All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]),
-    format(lists:sort(All));
+    format(lists:sort(select_data()));
 sampling(Node) ->
     rpc:call(Node, ?MODULE, sampling, [Node]).
 
 sampling(Node, Counter) when Node =:= node() ->
-    Time = emqx_dashboard_collection:get_local_time() - 7200000,
-    All = dets:select(emqx_collect, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]),
-    format_single(lists:sort(All), Counter);
+    format_single(lists:sort(select_data()), Counter);
 sampling(Node, Counter) ->
     rpc:call(Node, ?MODULE, sampling, [Node, Counter]).
 
+select_data() ->
+    Time = emqx_dashboard_collection:get_local_time() - 7200000,
+    ets:select(?TAB_COLLECT, [{{mqtt_collect,'$1','$2'}, [{'>', '$1', Time}], ['$_']}]).
+
 format(Collects) ->
     format(Collects, {[],[],[],[],[],[]}).
 format([], {Connection, Route, Subscription, Received, Sent, Dropped}) ->