Просмотр исходного кода

chore(mria): ekka_mnesia:running_nodes -> mria:running_nodes

k32 4 лет назад
Родитель
Сommit
ff48322e0c

+ 1 - 1
apps/emqx/src/emqx_sys.erl

@@ -150,7 +150,7 @@ handle_info({timeout, TRef, tick},
             State = #state{ticker = TRef, version = Version, sysdescr = Descr}) ->
     publish_any(version, Version),
     publish_any(sysdescr, Descr),
-    publish_any(brokers, ekka_mnesia:running_nodes()),
+    publish_any(brokers, mria_mnesia:running_nodes()),
     publish_any(stats, emqx_stats:getstats()),
     publish_any(metrics, emqx_metrics:all()),
     {noreply, tick(State), hibernate};

+ 2 - 2
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -153,7 +153,7 @@ param_path_operation()->
     }.
 
 list_bridges(get, _Params) ->
-    {200, lists:append([list_local_bridges(Node) || Node <- ekka_mnesia:running_nodes()])}.
+    {200, lists:append([list_local_bridges(Node) || Node <- mria_mnesia:running_nodes()])}.
 
 list_local_bridges(Node) when Node =:= node() ->
     [format_resp(Data) || Data <- emqx_bridge:list_bridges()];
@@ -161,7 +161,7 @@ list_local_bridges(Node) ->
     rpc_call(Node, list_local_bridges, [Node]).
 
 crud_bridges_cluster(Method, Params) ->
-    Results = [crud_bridges(Node, Method, Params) || Node <- ekka_mnesia:running_nodes()],
+    Results = [crud_bridges(Node, Method, Params) || Node <- mria_mnesia:running_nodes()],
     case lists:filter(fun({200}) -> false; ({200, _}) -> false; (_) -> true end, Results) of
         [] ->
             case Results of

+ 5 - 5
apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl

@@ -158,8 +158,8 @@ counters(get, #{bindings := #{counter := Counter}}) ->
     lookup([{<<"counter">>, Counter}]).
 
 current_counters(get, _Params) ->
-    Data = [get_collect(Node) || Node <- ekka_mnesia:running_nodes()],
-    Nodes = length(ekka_mnesia:running_nodes()),
+    Data = [get_collect(Node) || Node <- mria_mnesia:running_nodes()],
+    Nodes = length(mria_mnesia:running_nodes()),
     {Received, Sent, Sub, Conn} = format_current_metrics(Data),
     Response = #{
         nodes           => Nodes,
@@ -194,16 +194,16 @@ lookup_(#{node := Node, counter := Counter}) ->
 lookup_(#{node := Node}) ->
     {200, sampling(Node)};
 lookup_(#{counter := Counter}) ->
-    CounterData = merger_counters([sampling(Node, Counter) || Node <- ekka_mnesia:running_nodes()]),
+    CounterData = merger_counters([sampling(Node, Counter) || Node <- mria_mnesia:running_nodes()]),
     Data = hd(maps:values(CounterData)),
     {200, Data}.
 
 list_collect(Aggregate) ->
     case Aggregate of
         <<"true">> ->
-            [maps:put(node, Node, sampling(Node)) || Node <- ekka_mnesia:running_nodes()];
+            [maps:put(node, Node, sampling(Node)) || Node <- mria_mnesia:running_nodes()];
         _ ->
-            Counters = [sampling(Node) || Node <- ekka_mnesia:running_nodes()],
+            Counters = [sampling(Node) || Node <- mria_mnesia:running_nodes()],
             merger_counters(Counters)
     end.
 

+ 2 - 2
apps/emqx_gateway/src/emqx_gateway_http.erl

@@ -209,7 +209,7 @@ confexp({error, already_exist}) ->
                     emqx_type:clientid(), {atom(), atom()}) -> list().
 lookup_client(GwName, ClientId, FormatFun) ->
     lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun)
-                  || Node <- ekka_mnesia:running_nodes()]).
+                  || Node <- mria_mnesia:running_nodes()]).
 
 lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() ->
     ChanTab = emqx_gateway_cm:tabname(chan, GwName),
@@ -229,7 +229,7 @@ lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) ->
      | ok.
 kickout_client(GwName, ClientId) ->
     Results = [kickout_client(Node, GwName, ClientId)
-               || Node <- ekka_mnesia:running_nodes()],
+               || Node <- mria_mnesia:running_nodes()],
     case lists:any(fun(Item) -> Item =:= ok end, Results) of
         true  -> ok;
         false -> lists:last(Results)

+ 20 - 22
apps/emqx_management/src/emqx_mgmt.erl

@@ -164,7 +164,7 @@ stopped_node_info(Node) ->
 %%--------------------------------------------------------------------
 
 list_brokers() ->
-    [{Node, broker_info(Node)} || Node <- ekka_mnesia:running_nodes()].
+    [{Node, broker_info(Node)} || Node <- mria_mnesia:running_nodes()].
 
 lookup_broker(Node) ->
     broker_info(Node).
@@ -181,7 +181,7 @@ broker_info(Node) ->
 %%--------------------------------------------------------------------
 
 get_metrics() ->
-    nodes_info_count([get_metrics(Node) || Node <- ekka_mnesia:running_nodes()]).
+    nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]).
 
 get_metrics(Node) when Node =:= node() ->
     emqx_metrics:all();
@@ -201,7 +201,7 @@ get_stats() ->
         begin
             Stats = get_stats(Node),
             delete_keys(Stats, GlobalStatsKeys)
-        end || Node <- ekka_mnesia:running_nodes()]),
+        end || Node <- mria_mnesia:running_nodes()]),
     GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))),
     maps:merge(CountStats, GlobalStats).
 
@@ -232,10 +232,10 @@ nodes_info_count(PropList) ->
 %%--------------------------------------------------------------------
 
 lookup_client({clientid, ClientId}, FormatFun) ->
-    lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- ekka_mnesia:running_nodes()]);
+    lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun) || Node <- mria_mnesia:running_nodes()]);
 
 lookup_client({username, Username}, FormatFun) ->
-    lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- ekka_mnesia:running_nodes()]).
+    lists:append([lookup_client(Node, {username, Username}, FormatFun) || Node <- mria_mnesia:running_nodes()]).
 
 lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() ->
     lists:append(lists:map(
@@ -257,7 +257,7 @@ lookup_client(Node, {username, Username}, FormatFun) ->
     rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]).
 
 kickout_client(ClientId) ->
-    Results = [kickout_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
+    Results = [kickout_client(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
     case lists:any(fun(Item) -> Item =:= ok end, Results) of
         true  -> ok;
         false -> lists:last(Results)
@@ -273,7 +273,7 @@ list_authz_cache(ClientId) ->
     call_client(ClientId, list_authz_cache).
 
 list_client_subscriptions(ClientId) ->
-    Results = [client_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
+    Results = [client_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
     Expected = lists:filter(fun({error, _}) -> false;
                                ([]) -> false;
                                (_) -> true
@@ -290,7 +290,7 @@ client_subscriptions(Node, ClientId) ->
     rpc_call(Node, client_subscriptions, [Node, ClientId]).
 
 clean_authz_cache(ClientId) ->
-    Results = [clean_authz_cache(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
+    Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
     case lists:any(fun(Item) -> Item =:= ok end, Results) of
         true  -> ok;
         false -> lists:last(Results)
@@ -308,7 +308,7 @@ clean_authz_cache(Node, ClientId) ->
     rpc_call(Node, clean_authz_cache, [Node, ClientId]).
 
 clean_authz_cache_all() ->
-    Results = [{Node, clean_authz_cache_all(Node)} || Node <- ekka_mnesia:running_nodes()],
+    Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()],
     case lists:filter(fun({_Node, Item}) -> Item =/= ok end, Results) of
         []  -> ok;
         BadNodes -> {error, BadNodes}
@@ -328,7 +328,7 @@ set_quota_policy(ClientId, Policy) ->
 
 %% @private
 call_client(ClientId, Req) ->
-    Results = [call_client(Node, ClientId, Req) || Node <- ekka_mnesia:running_nodes()],
+    Results = [call_client(Node, ClientId, Req) || Node <- mria_mnesia:running_nodes()],
     Expected = lists:filter(fun({error, _}) -> false;
                                (_) -> true
                             end, Results),
@@ -366,7 +366,7 @@ list_subscriptions(Node) ->
     rpc_call(Node, list_subscriptions, [Node]).
 
 list_subscriptions_via_topic(Topic, FormatFun) ->
-    lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- ekka_mnesia:running_nodes()]).
+    lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) || Node <- mria_mnesia:running_nodes()]).
 
 list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
     MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}],
@@ -376,7 +376,7 @@ list_subscriptions_via_topic(Node, Topic, FormatFun) ->
     rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]).
 
 lookup_subscriptions(ClientId) ->
-    lists:append([lookup_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()]).
+    lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]).
 
 lookup_subscriptions(Node, ClientId) when Node =:= node() ->
     case ets:lookup(emqx_subid, ClientId) of
@@ -400,7 +400,7 @@ lookup_routes(Topic) ->
 %%--------------------------------------------------------------------
 
 subscribe(ClientId, TopicTables) ->
-    subscribe(ekka_mnesia:running_nodes(), ClientId, TopicTables).
+    subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables).
 
 subscribe([Node | Nodes], ClientId, TopicTables) ->
     case rpc_call(Node, do_subscribe, [ClientId, TopicTables]) of
@@ -424,7 +424,7 @@ publish(Msg) ->
     emqx:publish(Msg).
 
 unsubscribe(ClientId, Topic) ->
-    unsubscribe(ekka_mnesia:running_nodes(), ClientId, Topic).
+    unsubscribe(mria_mnesia:running_nodes(), ClientId, Topic).
 
 unsubscribe([Node | Nodes], ClientId, Topic) ->
     case rpc_call(Node, do_unsubscribe, [ClientId, Topic]) of
@@ -447,7 +447,7 @@ do_unsubscribe(ClientId, Topic) ->
 %%--------------------------------------------------------------------
 
 list_plugins() ->
-    [{Node, list_plugins(Node)} || Node <- ekka_mnesia:running_nodes()].
+    [{Node, list_plugins(Node)} || Node <- mria_mnesia:running_nodes()].
 
 list_plugins(Node) when Node =:= node() ->
     emqx_plugins:list();
@@ -474,7 +474,7 @@ reload_plugin(Node, Plugin) ->
 %%--------------------------------------------------------------------
 
 list_listeners() ->
-    lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]).
+    lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]).
 
 list_listeners(Node) when Node =:= node() ->
     [Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()];
@@ -505,7 +505,7 @@ manage_listener(Operation, Param = #{node := Node}) ->
     rpc_call(Node, manage_listener, [Operation, Param]).
 
 update_listener(Id, Config) ->
-    [update_listener(Node, Id, Config) || Node <- ekka_mnesia:running_nodes()].
+    [update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()].
 
 update_listener(Node, Id, Config) when Node =:= node() ->
     case emqx_listeners:parse_listener_id(Id) of
@@ -523,7 +523,7 @@ update_listener(Node, Id, Config) ->
     rpc_call(Node, update_listener, [Node, Id, Config]).
 
 remove_listener(Id) ->
-    [remove_listener(Node, Id) || Node <- ekka_mnesia:running_nodes()].
+    [remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()].
 
 remove_listener(Node, Id) when Node =:= node() ->
     {Type, Name} = emqx_listeners:parse_listener_id(Id),
@@ -540,7 +540,7 @@ remove_listener(Node, Id) ->
 %%--------------------------------------------------------------------
 
 get_alarms(Type) ->
-    [{Node, get_alarms(Node, Type)} || Node <- ekka_mnesia:running_nodes()].
+    [{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()].
 
 get_alarms(Node, Type) when Node =:= node() ->
     add_duration_field(emqx_alarm:get_alarms(Type));
@@ -553,7 +553,7 @@ deactivate(Node, Name) ->
     rpc_call(Node, deactivate, [Node, Name]).
 
 delete_all_deactivated_alarms() ->
-    [delete_all_deactivated_alarms(Node) || Node <- ekka_mnesia:running_nodes()].
+    [delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()].
 
 delete_all_deactivated_alarms(Node) when Node =:= node() ->
     emqx_alarm:delete_all_deactivated_alarms();
@@ -621,5 +621,3 @@ max_row_limit() ->
     ?MAX_ROW_LIMIT.
 
 table_size(Tab) -> ets:info(Tab, size).
-
-

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api.erl

@@ -167,7 +167,7 @@ cluster_query(Params, Tab, QsSchema, QueryFun) ->
     {_CodCnt, Qs} = params2qs(Params, QsSchema),
     Limit = b2i(limit(Params)),
     Page  = b2i(page(Params)),
-    Nodes = ekka_mnesia:running_nodes(),
+    Nodes = mria_mnesia:running_nodes(),
     Meta = #{page => Page, limit => Limit, count => 0},
     page_limit_check_query(Meta, {fun do_cluster_query/5, [Nodes, Tab, Qs, QueryFun, Meta]}).
 

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_listeners.erl

@@ -300,7 +300,7 @@ manage_listeners(_, #{bindings := #{id := Id, operation := Oper, node := Node}})
     Result;
 
 manage_listeners(_, #{bindings := #{id := Id, operation := Oper}}) ->
-    Results = [do_manage_listeners(Node, Id, Oper) || Node <- ekka_mnesia:running_nodes()],
+    Results = [do_manage_listeners(Node, Id, Oper) || Node <- mria_mnesia:running_nodes()],
     case lists:filter(fun({_, {200}}) -> false; (_) -> true end, Results) of
         [] -> {200};
         Errors -> {500, #{code => 'UNKNOW_ERROR', message => manage_listeners_err(Errors)}}

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_metrics.erl

@@ -154,6 +154,6 @@ list(get, #{query_string := Qs}) ->
             {200, emqx_mgmt:get_metrics()};
         _ ->
             Data = [maps:from_list(emqx_mgmt:get_metrics(Node) ++ [{node, Node}]) ||
-                        Node <- ekka_mnesia:running_nodes()],
+                        Node <- mria_mnesia:running_nodes()],
             {200, Data}
     end.

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_stats.erl

@@ -96,6 +96,6 @@ list(get, #{query_string := Qs}) ->
             {200, emqx_mgmt:get_stats()};
         _ ->
             Data = [maps:from_list(emqx_mgmt:get_stats(Node) ++ [{node, Node}]) ||
-                        Node <- ekka_mnesia:running_nodes()],
+                        Node <- mria_mnesia:running_nodes()],
             {200, Data}
     end.

+ 1 - 1
apps/emqx_modules/src/emqx_delayed_api.erl

@@ -210,7 +210,7 @@ generate_max_delayed_messages(Config) ->
 update_config_(Config) ->
     lists:foreach(fun(Node) ->
         update_config_(Node, Config)
-                  end, ekka_mnesia:running_nodes()).
+                  end, mria_mnesia:running_nodes()).
 
 update_config_(Node, Config) when Node =:= node() ->
     _ = emqx_delayed:update_config(Config),

+ 1 - 1
apps/emqx_modules/src/emqx_telemetry.erl

@@ -268,7 +268,7 @@ uptime() ->
     element(1, erlang:statistics(wall_clock)).
 
 nodes_uuid() ->
-    Nodes = lists:delete(node(), ekka_mnesia:running_nodes()),
+    Nodes = lists:delete(node(), mria_mnesia:running_nodes()),
     lists:foldl(fun(Node, Acc) ->
                     case rpc:call(Node, ?MODULE, get_uuid, []) of
                         {badrpc, _Reason} ->

+ 1 - 1
apps/emqx_modules/src/emqx_telemetry_api.erl

@@ -152,7 +152,7 @@ data(get, _Request) ->
 enable_telemetry(Enable) ->
     lists:foreach(fun(Node) ->
         enable_telemetry(Node, Enable)
-    end, ekka_mnesia:running_nodes()).
+    end, mria_mnesia:running_nodes()).
 
 enable_telemetry(Node, Enable) when Node =:= node() ->
     case Enable of

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -338,4 +338,4 @@ do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
 
 get_rule_metrics(Id) ->
     [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]))
-     || Node <- ekka_mnesia:running_nodes()].
+     || Node <- mria_mnesia:running_nodes()].