|
@@ -146,7 +146,7 @@ on_message_dropped(#message{topic = Topic}, _, _) ->
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
start_link() ->
|
|
start_link() ->
|
|
|
- Opts = emqx:get_config([topic_metrics], #{}),
|
|
|
|
|
|
|
+ Opts = emqx:get_config([topic_metrics], []),
|
|
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
|
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
|
|
|
|
|
|
|
|
stop() ->
|
|
stop() ->
|
|
@@ -198,7 +198,7 @@ init([Opts]) ->
|
|
|
ok = emqx_tables:new(?TAB, [{read_concurrency, true}]),
|
|
ok = emqx_tables:new(?TAB, [{read_concurrency, true}]),
|
|
|
erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking),
|
|
erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking),
|
|
|
Fun =
|
|
Fun =
|
|
|
- fun(Topic, CurrentSpeeds) ->
|
|
|
|
|
|
|
+ fun(#{topic := Topic}, CurrentSpeeds) ->
|
|
|
case do_register(Topic, CurrentSpeeds) of
|
|
case do_register(Topic, CurrentSpeeds) of
|
|
|
{ok, NSpeeds} ->
|
|
{ok, NSpeeds} ->
|
|
|
NSpeeds;
|
|
NSpeeds;
|
|
@@ -208,7 +208,7 @@ init([Opts]) ->
|
|
|
error("max topic metrics quota exceeded")
|
|
error("max topic metrics quota exceeded")
|
|
|
end
|
|
end
|
|
|
end,
|
|
end,
|
|
|
- {ok, #state{speeds = lists:foldl(Fun, #{}, maps:get(topics, Opts, []))}, hibernate}.
|
|
|
|
|
|
|
+ {ok, #state{speeds = lists:foldl(Fun, #{}, Opts)}, hibernate}.
|
|
|
|
|
|
|
|
handle_call({register, Topic}, _From, State = #state{speeds = Speeds}) ->
|
|
handle_call({register, Topic}, _From, State = #state{speeds = Speeds}) ->
|
|
|
case do_register(Topic, Speeds) of
|
|
case do_register(Topic, Speeds) of
|
|
@@ -348,16 +348,15 @@ format({Topic, Data}) ->
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
remove_topic_config(Topic) when is_binary(Topic) ->
|
|
remove_topic_config(Topic) when is_binary(Topic) ->
|
|
|
- Topics = emqx_config:get_raw([<<"topic_metrics">>, <<"topics">>], []) -- [Topic],
|
|
|
|
|
|
|
+ Topics = emqx_config:get_raw([<<"topic_metrics">>], []) -- [#{<<"topic">> => Topic}],
|
|
|
update_config(Topics).
|
|
update_config(Topics).
|
|
|
|
|
|
|
|
add_topic_config(Topic) when is_binary(Topic) ->
|
|
add_topic_config(Topic) when is_binary(Topic) ->
|
|
|
- Topics = emqx_config:get_raw([<<"topic_metrics">>, <<"topics">>], []) ++ [Topic],
|
|
|
|
|
- update_config(Topics).
|
|
|
|
|
|
|
+ Topics = emqx_config:get_raw([<<"topic_metrics">>], []) ++ [#{<<"topic">> => Topic}],
|
|
|
|
|
+ update_config(lists:usort(Topics)).
|
|
|
|
|
|
|
|
update_config(Topics) when is_list(Topics) ->
|
|
update_config(Topics) when is_list(Topics) ->
|
|
|
- Opts = emqx_config:get_raw([<<"topic_metrics">>], #{}),
|
|
|
|
|
- {ok, _} = emqx:update_config([topic_metrics], maps:put(<<"topics">>, Topics, Opts)),
|
|
|
|
|
|
|
+ {ok, _} = emqx:update_config([topic_metrics], Topics),
|
|
|
ok.
|
|
ok.
|
|
|
|
|
|
|
|
try_inc(Topic, Metric) ->
|
|
try_inc(Topic, Metric) ->
|