|
@@ -124,7 +124,7 @@ clear_history() ->
|
|
|
gen_server:call(?MODULE, ?FUNCTION_NAME, ?DEF_CALL_TIMEOUT).
|
|
gen_server:call(?MODULE, ?FUNCTION_NAME, ?DEF_CALL_TIMEOUT).
|
|
|
|
|
|
|
|
update_settings(Conf) ->
|
|
update_settings(Conf) ->
|
|
|
- emqx_conf:update([emqx_slow_subs], Conf, #{override_to => cluster}).
|
|
|
|
|
|
|
+ emqx_conf:update([slow_subs], Conf, #{override_to => cluster}).
|
|
|
|
|
|
|
|
init_topk_tab() ->
|
|
init_topk_tab() ->
|
|
|
case ets:whereis(?TOPK_TAB) of
|
|
case ets:whereis(?TOPK_TAB) of
|
|
@@ -146,7 +146,7 @@ post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) ->
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
init([]) ->
|
|
init([]) ->
|
|
|
- emqx_conf:add_handler([emqx_slow_subs], ?MODULE),
|
|
|
|
|
|
|
+ emqx_conf:add_handler([slow_subs], ?MODULE),
|
|
|
|
|
|
|
|
InitState = #{enable => false,
|
|
InitState = #{enable => false,
|
|
|
last_tick_at => 0,
|
|
last_tick_at => 0,
|
|
@@ -154,11 +154,11 @@ init([]) ->
|
|
|
notice_timer => undefined
|
|
notice_timer => undefined
|
|
|
},
|
|
},
|
|
|
|
|
|
|
|
- Enable = emqx:get_config([emqx_slow_subs, enable]),
|
|
|
|
|
|
|
+ Enable = emqx:get_config([slow_subs, enable]),
|
|
|
{ok, check_enable(Enable, InitState)}.
|
|
{ok, check_enable(Enable, InitState)}.
|
|
|
|
|
|
|
|
handle_call({update_settings, #{enable := Enable} = Conf}, _From, State) ->
|
|
handle_call({update_settings, #{enable := Enable} = Conf}, _From, State) ->
|
|
|
- emqx_config:put([emqx_slow_subs], Conf),
|
|
|
|
|
|
|
+ emqx_config:put([slow_subs], Conf),
|
|
|
State2 = check_enable(Enable, State),
|
|
State2 = check_enable(Enable, State),
|
|
|
{reply, ok, State2};
|
|
{reply, ok, State2};
|
|
|
|
|
|
|
@@ -204,7 +204,7 @@ expire_tick() ->
|
|
|
erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME).
|
|
erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME).
|
|
|
|
|
|
|
|
notice_tick() ->
|
|
notice_tick() ->
|
|
|
- case emqx:get_config([emqx_slow_subs, notice_interval]) of
|
|
|
|
|
|
|
+ case emqx:get_config([slow_subs, notice_interval]) of
|
|
|
0 -> undefined;
|
|
0 -> undefined;
|
|
|
Interval ->
|
|
Interval ->
|
|
|
erlang:send_after(Interval, self(), ?FUNCTION_NAME)
|
|
erlang:send_after(Interval, self(), ?FUNCTION_NAME)
|
|
@@ -225,7 +225,7 @@ do_publish([], _, _) ->
|
|
|
ok;
|
|
ok;
|
|
|
|
|
|
|
|
do_publish(Logs, Rank, TickTime) ->
|
|
do_publish(Logs, Rank, TickTime) ->
|
|
|
- BatchSize = emqx:get_config([emqx_slow_subs, notice_batch_size]),
|
|
|
|
|
|
|
+ BatchSize = emqx:get_config([slow_subs, notice_batch_size]),
|
|
|
do_publish(Logs, BatchSize, Rank, TickTime, []).
|
|
do_publish(Logs, BatchSize, Rank, TickTime, []).
|
|
|
|
|
|
|
|
do_publish([Log | T], Size, Rank, TickTime, Cache) when Size > 0 ->
|
|
do_publish([Log | T], Size, Rank, TickTime, Cache) when Size > 0 ->
|
|
@@ -254,7 +254,7 @@ publish(TickTime, Notices) ->
|
|
|
logs => lists:reverse(Notices)},
|
|
logs => lists:reverse(Notices)},
|
|
|
Payload = emqx_json:encode(WindowLog),
|
|
Payload = emqx_json:encode(WindowLog),
|
|
|
Msg = #message{ id = emqx_guid:gen()
|
|
Msg = #message{ id = emqx_guid:gen()
|
|
|
- , qos = emqx:get_config([emqx_slow_subs, notice_qos])
|
|
|
|
|
|
|
+ , qos = emqx:get_config([slow_subs, notice_qos])
|
|
|
, from = ?MODULE
|
|
, from = ?MODULE
|
|
|
, topic = emqx_topic:systop(?NOTICE_TOPIC_NAME)
|
|
, topic = emqx_topic:systop(?NOTICE_TOPIC_NAME)
|
|
|
, payload = Payload
|
|
, payload = Payload
|
|
@@ -264,7 +264,7 @@ publish(TickTime, Notices) ->
|
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
load(State) ->
|
|
load(State) ->
|
|
|
- MaxSizeT = emqx:get_config([emqx_slow_subs, top_k_num]),
|
|
|
|
|
|
|
+ MaxSizeT = emqx:get_config([slow_subs, top_k_num]),
|
|
|
MaxSize = erlang:min(MaxSizeT, ?MAX_TAB_SIZE),
|
|
MaxSize = erlang:min(MaxSizeT, ?MAX_TAB_SIZE),
|
|
|
_ = emqx:hook('message.slow_subs_stats',
|
|
_ = emqx:hook('message.slow_subs_stats',
|
|
|
{?MODULE, on_stats_update, [#{max_size => MaxSize}]}
|
|
{?MODULE, on_stats_update, [#{max_size => MaxSize}]}
|
|
@@ -283,7 +283,7 @@ unload(#{notice_timer := NoticeTimer, expire_timer := ExpireTimer} = State) ->
|
|
|
|
|
|
|
|
do_clear(Logs) ->
|
|
do_clear(Logs) ->
|
|
|
Now = ?NOW,
|
|
Now = ?NOW,
|
|
|
- Interval = emqx:get_config([emqx_slow_subs, expire_interval]),
|
|
|
|
|
|
|
+ Interval = emqx:get_config([slow_subs, expire_interval]),
|
|
|
Each = fun(#top_k{index = Index, last_update_time = Ts}) ->
|
|
Each = fun(#top_k{index = Index, last_update_time = Ts}) ->
|
|
|
case Now - Ts >= Interval of
|
|
case Now - Ts >= Interval of
|
|
|
true ->
|
|
true ->
|
|
@@ -330,7 +330,7 @@ check_enable(Enable, #{enable := IsEnable} = State) ->
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
update_threshold() ->
|
|
update_threshold() ->
|
|
|
- Threshold = emqx:get_config([emqx_slow_subs, threshold]),
|
|
|
|
|
|
|
+ Threshold = emqx:get_config([slow_subs, threshold]),
|
|
|
emqx_message_latency_stats:update_threshold(Threshold),
|
|
emqx_message_latency_stats:update_threshold(Threshold),
|
|
|
ok.
|
|
ok.
|
|
|
|
|
|