فهرست منبع

Merge pull request #9363 from zhongwencool/statsd-update-api

refactor: emqx_statsd hot update
zhongwencool 3 سال پیش
والد
کامیت
966e6ddeeb

+ 6 - 0
apps/emqx_statsd/i18n/emqx_statsd_schema_i18n.conf

@@ -45,6 +45,12 @@ emqx_statsd_schema {
       zh: """指标的推送间隔。"""
     }
   }
+  tags {
+    desc {
+      en: """The tags for metrics."""
+      zh: """指标的标签。"""
+    }
+  }
 
   enable {
     desc {

+ 1 - 4
apps/emqx_statsd/include/emqx_statsd.hrl

@@ -1,5 +1,2 @@
 -define(APP, emqx_statsd).
--define(DEFAULT_SAMPLE_TIME_INTERVAL, 10000).
--define(DEFAULT_FLUSH_TIME_INTERVAL, 10000).
--define(DEFAULT_HOST, "127.0.0.1").
--define(DEFAULT_PORT, 8125).
+-define(STATSD, [statsd]).

+ 2 - 2
apps/emqx_statsd/src/emqx_statsd.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_statsd, [
-    {description, "An OTP application"},
-    {vsn, "5.0.2"},
+    {description, "EMQX Statsd"},
+    {vsn, "5.0.3"},
     {registered, []},
     {mod, {emqx_statsd_app, []}},
     {applications, [

+ 40 - 76
apps/emqx_statsd/src/emqx_statsd.erl

@@ -28,18 +28,17 @@
 -include_lib("emqx/include/logger.hrl").
 
 -export([
-    update/1,
     start/0,
     stop/0,
     restart/0,
-    %% for rpc
+    %% for rpc: remove after 5.1.x
     do_start/0,
     do_stop/0,
     do_restart/0
 ]).
 
 %% Interface
--export([start_link/1]).
+-export([start_link/0]).
 
 %% Internal Exports
 -export([
@@ -51,40 +50,15 @@
     terminate/2
 ]).
 
--record(state, {
-    timer :: reference() | undefined,
-    sample_time_interval :: pos_integer(),
-    flush_time_interval :: pos_integer(),
-    estatsd_pid :: pid()
-}).
-
-update(Config) ->
-    case
-        emqx_conf:update(
-            [statsd],
-            Config,
-            #{rawconf_with_defaults => true, override_to => cluster}
-        )
-    of
-        {ok, #{raw_config := NewConfigRows}} ->
-            ok = stop(),
-            case maps:get(<<"enable">>, Config, true) of
-                true ->
-                    ok = restart();
-                false ->
-                    ok = stop()
-            end,
-            {ok, NewConfigRows};
-        {error, Reason} ->
-            {error, Reason}
-    end.
+-define(SAMPLE_TIMEOUT, sample_timeout).
 
+%% Remove after 5.1.x
 start() -> check_multicall_result(emqx_statsd_proto_v1:start(mria_mnesia:running_nodes())).
 stop() -> check_multicall_result(emqx_statsd_proto_v1:stop(mria_mnesia:running_nodes())).
 restart() -> check_multicall_result(emqx_statsd_proto_v1:restart(mria_mnesia:running_nodes())).
 
 do_start() ->
-    emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{})).
+    emqx_statsd_sup:ensure_child_started(?APP).
 
 do_stop() ->
     emqx_statsd_sup:ensure_child_stopped(?APP).
@@ -94,59 +68,51 @@ do_restart() ->
     ok = do_start(),
     ok.
 
-start_link(Opts) ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
-init([Opts]) ->
+init([]) ->
     process_flag(trap_exit, true),
-    Tags = tags(maps:get(tags, Opts, #{})),
-    {Host, Port} = maps:get(server, Opts, {?DEFAULT_HOST, ?DEFAULT_PORT}),
-    Opts1 = maps:without(
-        [
-            sample_time_interval,
-            flush_time_interval
-        ],
-        Opts#{
-            tags => Tags,
-            host => Host,
-            port => Port,
-            prefix => <<"emqx">>
-        }
-    ),
-    {ok, Pid} = estatsd:start_link(maps:to_list(Opts1)),
-    SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL),
-    FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL),
+    #{
+        tags := TagsRaw,
+        server := {Host, Port},
+        sample_time_interval := SampleTimeInterval,
+        flush_time_interval := FlushTimeInterval
+    } = emqx_conf:get([statsd]),
+    Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw),
+    Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}],
+    {ok, Pid} = estatsd:start_link(Opts),
     {ok,
-        ensure_timer(#state{
-            sample_time_interval = SampleTimeInterval,
-            flush_time_interval = FlushTimeInterval,
-            estatsd_pid = Pid
+        ensure_timer(#{
+            sample_time_interval => SampleTimeInterval,
+            flush_time_interval => FlushTimeInterval,
+            estatsd_pid => Pid
         })}.
 
 handle_call(_Req, _From, State) ->
-    {noreply, State}.
+    {reply, ignore, State}.
 
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
 handle_info(
-    {timeout, Ref, sample_timeout},
-    State = #state{
-        sample_time_interval = SampleTimeInterval,
-        flush_time_interval = FlushTimeInterval,
-        estatsd_pid = Pid,
-        timer = Ref
+    {timeout, Ref, ?SAMPLE_TIMEOUT},
+    State = #{
+        sample_time_interval := SampleTimeInterval,
+        flush_time_interval := FlushTimeInterval,
+        estatsd_pid := Pid,
+        timer := Ref
     }
 ) ->
     Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(),
     SampleRate = SampleTimeInterval / FlushTimeInterval,
     StatsdMetrics = [
-        {gauge, trans_metrics_name(Name), Value, SampleRate, []}
+        {gauge, Name, Value, SampleRate, []}
      || {Name, Value} <- Metrics
     ],
-    estatsd:submit(Pid, StatsdMetrics),
-    {noreply, ensure_timer(State)};
-handle_info({'EXIT', Pid, Error}, State = #state{estatsd_pid = Pid}) ->
+    ok = estatsd:submit(Pid, StatsdMetrics),
+    {noreply, ensure_timer(State), hibernate};
+handle_info({'EXIT', Pid, Error}, State = #{estatsd_pid := Pid}) ->
     {stop, {shutdown, Error}, State};
 handle_info(_Msg, State) ->
     {noreply, State}.
@@ -154,16 +120,13 @@ handle_info(_Msg, State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-terminate(_Reason, #state{estatsd_pid = Pid}) ->
+terminate(_Reason, #{estatsd_pid := Pid}) ->
     estatsd:stop(Pid),
     ok.
 
 %%------------------------------------------------------------------------------
 %% Internal function
 %%------------------------------------------------------------------------------
-trans_metrics_name(Name) ->
-    Name0 = atom_to_binary(Name, utf8),
-    binary_to_atom(<<"emqx.", Name0/binary>>, utf8).
 
 emqx_vm_data() ->
     Idle =
@@ -179,12 +142,8 @@ emqx_vm_data() ->
         {cpu_use, 100 - Idle}
     ] ++ emqx_vm:mem_info().
 
-tags(Map) ->
-    Tags = maps:to_list(Map),
-    [{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags].
-
-ensure_timer(State = #state{sample_time_interval = SampleTimeInterval}) ->
-    State#state{timer = emqx_misc:start_timer(SampleTimeInterval, sample_timeout)}.
+ensure_timer(State = #{sample_time_interval := SampleTimeInterval}) ->
+    State#{timer => emqx_misc:start_timer(SampleTimeInterval, ?SAMPLE_TIMEOUT)}.
 
 check_multicall_result({Results, []}) ->
     case
@@ -201,3 +160,8 @@ check_multicall_result({Results, []}) ->
     end;
 check_multicall_result({_, _}) ->
     error(multicall_failed).
+
+to_bin(B) when is_binary(B) -> B;
+to_bin(I) when is_integer(I) -> integer_to_binary(I);
+to_bin(L) when is_list(L) -> list_to_binary(L);
+to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).

+ 5 - 4
apps/emqx_statsd/src/emqx_statsd_api.erl

@@ -77,15 +77,16 @@ statsd_config_schema() ->
 statsd_example() ->
     #{
         enable => true,
-        flush_time_interval => "32s",
-        sample_time_interval => "32s",
-        server => "127.0.0.1:8125"
+        flush_time_interval => "30s",
+        sample_time_interval => "30s",
+        server => "127.0.0.1:8125",
+        tags => #{}
     }.
 
 statsd(get, _Params) ->
     {200, emqx:get_raw_config([<<"statsd">>], #{})};
 statsd(put, #{body := Body}) ->
-    case emqx_statsd:update(Body) of
+    case emqx_statsd_config:update(Body) of
         {ok, NewConfig} ->
             {200, NewConfig};
         {error, Reason} ->

+ 2 - 9
apps/emqx_statsd/src/emqx_statsd_app.erl

@@ -27,15 +27,8 @@
 
 start(_StartType, _StartArgs) ->
     {ok, Sup} = emqx_statsd_sup:start_link(),
-    maybe_enable_statsd(),
+    emqx_statsd_config:add_handler(),
     {ok, Sup}.
 stop(_) ->
+    emqx_statsd_config:remove_handler(),
     ok.
-
-maybe_enable_statsd() ->
-    case emqx_conf:get([statsd, enable], false) of
-        true ->
-            emqx_statsd_sup:ensure_child_started(?APP, emqx_conf:get([statsd], #{}));
-        false ->
-            ok
-    end.

+ 54 - 0
apps/emqx_statsd/src/emqx_statsd_config.erl

@@ -0,0 +1,54 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_statsd_config).
+
+-behaviour(emqx_config_handler).
+
+-include("emqx_statsd.hrl").
+
+-export([add_handler/0, remove_handler/0]).
+-export([post_config_update/5]).
+-export([update/1]).
+
+update(Config) ->
+    case
+        emqx_conf:update(
+            ?STATSD,
+            Config,
+            #{rawconf_with_defaults => true, override_to => cluster}
+        )
+    of
+        {ok, #{raw_config := NewConfigRows}} ->
+            {ok, NewConfigRows};
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+add_handler() ->
+    ok = emqx_config_handler:add_handler(?STATSD, ?MODULE),
+    ok.
+
+remove_handler() ->
+    ok = emqx_config_handler:remove_handler(?STATSD),
+    ok.
+
+post_config_update(?STATSD, _Req, #{enable := true}, _Old, _AppEnvs) ->
+    emqx_statsd_sup:ensure_child_stopped(?APP),
+    emqx_statsd_sup:ensure_child_started(?APP);
+post_config_update(?STATSD, _Req, #{enable := false}, _Old, _AppEnvs) ->
+    emqx_statsd_sup:ensure_child_stopped(?APP);
+post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->
+    ok.

+ 31 - 4
apps/emqx_statsd/src/emqx_statsd_schema.erl

@@ -25,7 +25,8 @@
     namespace/0,
     roots/0,
     fields/1,
-    desc/1
+    desc/1,
+    validations/0
 ]).
 
 namespace() -> "statsd".
@@ -45,7 +46,8 @@ fields("statsd") ->
             )},
         {server, fun server/1},
         {sample_time_interval, fun sample_interval/1},
-        {flush_time_interval, fun flush_interval/1}
+        {flush_time_interval, fun flush_interval/1},
+        {tags, fun tags/1}
     ].
 
 desc("statsd") -> ?DESC(statsd);
@@ -59,12 +61,37 @@ server(_) -> undefined.
 
 sample_interval(type) -> emqx_schema:duration_ms();
 sample_interval(required) -> true;
-sample_interval(default) -> "10s";
+sample_interval(default) -> "30s";
 sample_interval(desc) -> ?DESC(?FUNCTION_NAME);
 sample_interval(_) -> undefined.
 
 flush_interval(type) -> emqx_schema:duration_ms();
 flush_interval(required) -> true;
-flush_interval(default) -> "10s";
+flush_interval(default) -> "30s";
 flush_interval(desc) -> ?DESC(?FUNCTION_NAME);
 flush_interval(_) -> undefined.
+
+tags(type) -> map();
+tags(required) -> false;
+tags(default) -> #{};
+tags(desc) -> ?DESC(?FUNCTION_NAME);
+tags(_) -> undefined.
+
+validations() ->
+    [
+        {check_interval, fun check_interval/1}
+    ].
+
+check_interval(Conf) ->
+    case hocon_maps:get("statsd.sample_time_interval", Conf) of
+        undefined ->
+            ok;
+        Sample ->
+            Flush = hocon_maps:get("statsd.flush_time_interval", Conf),
+            case Sample =< Flush of
+                true ->
+                    true;
+                false ->
+                    {bad_interval, #{sample_time_interval => Sample, flush_time_interval => Flush}}
+            end
+    end.

+ 10 - 11
apps/emqx_statsd/src/emqx_statsd_sup.erl

@@ -10,7 +10,6 @@
 -export([
     start_link/0,
     ensure_child_started/1,
-    ensure_child_started/2,
     ensure_child_stopped/1
 ]).
 
@@ -19,7 +18,7 @@
 %% Helper macro for declaring children of supervisor
 -define(CHILD(Mod, Opts), #{
     id => Mod,
-    start => {Mod, start_link, [Opts]},
+    start => {Mod, start_link, Opts},
     restart => permanent,
     shutdown => 5000,
     type => worker,
@@ -29,13 +28,9 @@
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
--spec ensure_child_started(supervisor:child_spec()) -> ok.
-ensure_child_started(ChildSpec) when is_map(ChildSpec) ->
-    assert_started(supervisor:start_child(?MODULE, ChildSpec)).
-
--spec ensure_child_started(atom(), map()) -> ok.
-ensure_child_started(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) ->
-    assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))).
+-spec ensure_child_started(atom()) -> ok.
+ensure_child_started(Mod) when is_atom(Mod) ->
+    assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, []))).
 
 %% @doc Stop the child worker process.
 -spec ensure_child_stopped(any()) -> ok.
@@ -50,13 +45,17 @@ ensure_child_stopped(ChildId) ->
     end.
 
 init([]) ->
-    {ok, {{one_for_one, 10, 3600}, []}}.
+    Children =
+        case emqx_conf:get([statsd, enable], false) of
+            true -> [?CHILD(emqx_statsd, [])];
+            false -> []
+        end,
+    {ok, {{one_for_one, 100, 3600}, Children}}.
 
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
 
 assert_started({ok, _Pid}) -> ok;
-assert_started({ok, _Pid, _Info}) -> ok;
 assert_started({error, {already_started, _Pid}}) -> ok;
 assert_started({error, Reason}) -> erlang:error(Reason).

+ 83 - 7
apps/emqx_statsd/test/emqx_statsd_SUITE.erl

@@ -5,28 +5,104 @@
 
 -include_lib("common_test/include/ct.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-import(emqx_dashboard_api_test_helpers, [request/3, uri/1]).
+
+-define(BASE_CONF, <<
+    "\n"
+    "statsd {\n"
+    "enable = true\n"
+    "flush_time_interval = 4s\n"
+    "sample_time_interval = 4s\n"
+    "server = \"127.0.0.1:8126\"\n"
+    "tags {\"t1\" = \"good\", test = 100}\n"
+    "}\n"
+>>).
 
 init_per_suite(Config) ->
-    emqx_common_test_helpers:start_apps([emqx_statsd]),
+    emqx_common_test_helpers:start_apps(
+        [emqx_conf, emqx_dashboard, emqx_statsd],
+        fun set_special_configs/1
+    ),
+    ok = emqx_common_test_helpers:load_config(emqx_statsd_schema, ?BASE_CONF, #{
+        raw_with_default => true
+    }),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_common_test_helpers:stop_apps([emqx_statsd]).
+    emqx_common_test_helpers:stop_apps([emqx_statsd, emqx_dashboard, emqx_conf]).
+
+set_special_configs(emqx_dashboard) ->
+    emqx_dashboard_api_test_helpers:set_default_config();
+set_special_configs(_) ->
+    ok.
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 t_statsd(_) ->
-    {ok, Socket} = gen_udp:open(8125),
+    {ok, Socket} = gen_udp:open(8126, [{active, true}]),
     receive
-        {udp, _Socket, _Host, _Port, Bin} ->
-            ?assert(length(Bin) > 50)
-    after 11 * 1000 ->
-        ?assert(true, failed)
+        {udp, Socket1, Host, Port, Data} ->
+            ct:pal("receive:~p~n", [{Socket, Socket1, Host, Port}]),
+            ?assert(length(Data) > 50),
+            ?assert(nomatch =/= string:find(Data, "\nemqx.cpu_use:"))
+    after 10 * 1000 ->
+        error(timeout)
     end,
     gen_udp:close(Socket).
 
 t_management(_) ->
     ?assertMatch(ok, emqx_statsd:start()),
+    ?assertMatch(ok, emqx_statsd:start()),
+    ?assertMatch(ok, emqx_statsd:stop()),
     ?assertMatch(ok, emqx_statsd:stop()),
     ?assertMatch(ok, emqx_statsd:restart()).
+
+t_rest_http(_) ->
+    {ok, Res0} = request(get),
+    ?assertEqual(
+        #{
+            <<"enable">> => true,
+            <<"flush_time_interval">> => <<"4s">>,
+            <<"sample_time_interval">> => <<"4s">>,
+            <<"server">> => <<"127.0.0.1:8126">>,
+            <<"tags">> => #{<<"t1">> => <<"good">>, <<"test">> => 100}
+        },
+        Res0
+    ),
+    {ok, Res1} = request(put, #{enable => false}),
+    ?assertMatch(#{<<"enable">> := false}, Res1),
+    ?assertEqual(maps:remove(<<"enable">>, Res0), maps:remove(<<"enable">>, Res1)),
+    {ok, Res2} = request(get),
+    ?assertEqual(Res1, Res2),
+    ?assertEqual(
+        error, request(put, #{sample_time_interval => "11s", flush_time_interval => "10s"})
+    ),
+    {ok, _} = request(put, #{enable => true}),
+    ok.
+
+t_kill_exit(_) ->
+    {ok, _} = request(put, #{enable => true}),
+    Pid = erlang:whereis(emqx_statsd),
+    ?assertEqual(ignore, gen_server:call(Pid, whatever)),
+    ?assertEqual(ok, gen_server:cast(Pid, whatever)),
+    ?assertEqual(Pid, erlang:whereis(emqx_statsd)),
+    #{estatsd_pid := Estatsd} = sys:get_state(emqx_statsd),
+    ?assert(erlang:exit(Estatsd, kill)),
+    ?assertEqual(false, is_process_alive(Estatsd)),
+    ct:sleep(150),
+    Pid1 = erlang:whereis(emqx_statsd),
+    ?assertNotEqual(Pid, Pid1),
+    #{estatsd_pid := Estatsd1} = sys:get_state(emqx_statsd),
+    ?assertNotEqual(Estatsd, Estatsd1),
+    ok.
+
+request(Method) -> request(Method, []).
+
+request(Method, Body) ->
+    case request(Method, uri(["statsd"]), Body) of
+        {ok, 200, Res} ->
+            {ok, emqx_json:decode(Res, [return_maps])};
+        {ok, _Status, _} ->
+            error
+    end.

+ 3 - 0
changes/v5.0.11-en.md

@@ -15,8 +15,11 @@
   automatically if needed. Use `PUT /gateways/{name}/enable/{true|false}` to
   enable or disable gateway. No more `DELETE /gateways/{name}`.
 
+- Support `statsd {tags: {"user-defined-tag" = "tag-value"}` configure and improve stability of `emqx_statsd` [#9363](http://github.com/emqx/emqx/pull/9363).
+
 - Improve node name generation rules to avoid potential atom table overflow risk [#9387](https://github.com/emqx/emqx/pull/9387).
 
+
 ## Bug fixes
 
 - Fix create trace sometime failed by end_at time has already passed. [#9303](https://github.com/emqx/emqx/pull/9303)

+ 2 - 0
changes/v5.0.11-zh.md

@@ -13,6 +13,8 @@
 - 重新设计了 /gateways API [9364](https://github.com/emqx/emqx/pull/9364)。
   使用 PUT /gateways/{name} 代替了 POST /gateways,现在网关将在需要时自动加载,然后删除了 DELETE /gateways/{name},之后可以使用 PUT /gateways/{name}/enable/{true|false} 来开启或禁用网关。
 
+- 支持 `statsd {tags: {"user-defined-tag" = "tag-value"}` 配置,并提升 `emqx_statsd` 的稳定性 [#9363](http://github.com/emqx/emqx/pull/9363)。
+
 - 改进了节点名称生成规则,以避免潜在的原子表溢出风险 [#9387](https://github.com/emqx/emqx/pull/9387)。
 
 ## 修复