Explorar el Código

Merge pull request #7369 from thalesmg/telemetry-revamp-part2

Telemetry revamp part2
Thales Macedo Garitezi hace 3 años
padre
commit
7b6a71c117

+ 37 - 0
apps/emqx_modules/src/emqx_modules.erl

@@ -0,0 +1,37 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_modules).
+
+-export([
+    get_advanced_mqtt_features_in_use/0,
+    set_advanced_mqtt_features_in_use/1
+]).
+
+-type advanced_mqtt_feature() :: delayed | topic_rewrite | retained | auto_subscribe.
+-type advanced_mqtt_features_in_use() :: #{advanced_mqtt_feature() => boolean()}.
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+-spec get_advanced_mqtt_features_in_use() -> advanced_mqtt_features_in_use().
+get_advanced_mqtt_features_in_use() ->
+    application:get_env(?MODULE, advanced_mqtt_features_in_use, #{}).
+
+-spec set_advanced_mqtt_features_in_use(advanced_mqtt_features_in_use()) -> ok.
+set_advanced_mqtt_features_in_use(Features) ->
+    application:set_env(?MODULE, advanced_mqtt_features_in_use, Features).

+ 15 - 1
apps/emqx_modules/src/emqx_modules_app.erl

@@ -33,7 +33,21 @@ stop(_State) ->
     ok.
 
 maybe_enable_modules() ->
-    emqx_conf:get([delayed, enable], true) andalso emqx_delayed:enable(),
+    DelayedEnabled = emqx_conf:get([delayed, enable], true),
+    RewriteEnabled = length(emqx_conf:get([rewrite], [])) > 0,
+    RetainerEnabled = emqx_conf:get([retainer, enable], false),
+    AutoSubscribeEnabled = length(emqx_conf:get([auto_subscribe, topics], [])) > 0,
+    application:set_env(
+        emqx_modules,
+        advanced_mqtt_features_in_use,
+        #{
+            delayed => DelayedEnabled,
+            topic_rewrite => RewriteEnabled,
+            retained => RetainerEnabled,
+            auto_subscribe => AutoSubscribeEnabled
+        }
+    ),
+    DelayedEnabled andalso emqx_delayed:enable(),
     emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(),
     emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(),
     emqx_conf_cli:load(),

+ 83 - 23
apps/emqx_modules/src/emqx_telemetry.erl

@@ -76,10 +76,13 @@
 -record(state, {
     uuid :: undefined | binary(),
     url :: string(),
-    report_interval :: undefined | non_neg_integer(),
-    timer = undefined :: undefined | reference()
+    report_interval :: non_neg_integer(),
+    timer = undefined :: undefined | reference(),
+    previous_metrics = #{} :: map()
 }).
 
+-type state() :: #state{}.
+
 %% The count of 100-nanosecond intervals between the UUID epoch
 %% 1582-10-15 00:00:00 and the UNIX epoch 1970-01-01 00:00:00.
 -define(GREGORIAN_EPOCH_OFFSET, 16#01b21dd213814000).
@@ -136,6 +139,7 @@ get_telemetry() ->
 %% is very small, it should be safe to ignore.
 -dialyzer([{nowarn_function, [init/1]}]).
 init(_Opts) ->
+    State0 = empty_state(),
     UUID1 =
         case mnesia:dirty_read(?TELEMETRY, ?UNIQUE_ID) of
             [] ->
@@ -148,19 +152,15 @@ init(_Opts) ->
             [#telemetry{uuid = UUID} | _] ->
                 UUID
         end,
-    {ok, #state{
-        url = ?TELEMETRY_URL,
-        report_interval = timer:seconds(?REPORT_INTERVAL),
-        uuid = UUID1
-    }}.
+    {ok, State0#state{uuid = UUID1}}.
 
-handle_call(enable, _From, State) ->
+handle_call(enable, _From, State0) ->
     case ?MODULE:official_version(emqx_app:get_release()) of
         true ->
-            report_telemetry(State),
+            State = report_telemetry(State0),
             {reply, ok, ensure_report_timer(State)};
         false ->
-            {reply, {error, not_official_version}, State}
+            {reply, {error, not_official_version}, State0}
     end;
 handle_call(disable, _From, State = #state{timer = Timer}) ->
     case ?MODULE:official_version(emqx_app:get_release()) of
@@ -173,7 +173,8 @@ handle_call(disable, _From, State = #state{timer = Timer}) ->
 handle_call(get_uuid, _From, State = #state{uuid = UUID}) ->
     {reply, {ok, UUID}, State};
 handle_call(get_telemetry, _From, State) ->
-    {reply, {ok, get_telemetry(State)}, State};
+    {_State, Telemetry} = get_telemetry(State),
+    {reply, {ok, Telemetry}, State};
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
     {reply, ignored, State}.
@@ -186,11 +187,12 @@ handle_continue(Continue, State) ->
     ?SLOG(error, #{msg => "unexpected_continue", continue => Continue}),
     {noreply, State}.
 
-handle_info({timeout, TRef, time_to_report_telemetry_data}, State = #state{timer = TRef}) ->
-    case get_status() of
-        true -> report_telemetry(State);
-        false -> ok
-    end,
+handle_info({timeout, TRef, time_to_report_telemetry_data}, State0 = #state{timer = TRef}) ->
+    State =
+        case get_status() of
+            true -> report_telemetry(State0);
+            false -> State0
+        end,
     {noreply, ensure_report_timer(State)};
 handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unexpected_info", info => Info}),
@@ -307,6 +309,9 @@ messages_sent() ->
 messages_received() ->
     emqx_metrics:val('messages.received').
 
+topic_count() ->
+    emqx_stats:getstat('topics.count').
+
 generate_uuid() ->
     MicroSeconds = erlang:system_time(microsecond),
     Timestamp = MicroSeconds * 10 + ?GREGORIAN_EPOCH_OFFSET,
@@ -323,9 +328,11 @@ generate_uuid() ->
         )
     ).
 
-get_telemetry(#state{uuid = UUID}) ->
+-spec get_telemetry(state()) -> {state(), proplists:proplist()}.
+get_telemetry(State0 = #state{uuid = UUID}) ->
     OSInfo = os_info(),
-    [
+    {MQTTRTInsights, State} = mqtt_runtime_insights(State0),
+    {State, [
         {emqx_version, bin(emqx_app:get_release())},
         {license, [{edition, <<"community">>}]},
         {os_name, bin(get_value(os_name, OSInfo))},
@@ -339,11 +346,13 @@ get_telemetry(#state{uuid = UUID}) ->
         {messages_received, messages_received()},
         {messages_sent, messages_sent()},
         {build_info, build_info()},
-        {vm_specs, vm_specs()}
-    ].
+        {vm_specs, vm_specs()},
+        {mqtt_runtime_insights, MQTTRTInsights},
+        {advanced_mqtt_features, advanced_mqtt_features()}
+    ]}.
 
-report_telemetry(State = #state{url = URL}) ->
-    Data = get_telemetry(State),
+report_telemetry(State0 = #state{url = URL}) ->
+    {State, Data} = get_telemetry(State0),
     case emqx_json:safe_encode(Data) of
         {ok, Bin} ->
             httpc_request(post, URL, [], Bin),
@@ -351,7 +360,8 @@ report_telemetry(State = #state{url = URL}) ->
         {error, Reason} ->
             %% debug? why?
             ?tp(debug, telemetry_data_encode_error, #{data => Data, reason => Reason})
-    end.
+    end,
+    State.
 
 httpc_request(Method, URL, Headers, Body) ->
     HTTPOptions = [{timeout, 10_000}],
@@ -401,9 +411,59 @@ vm_specs() ->
         {total_memory, proplists:get_value(available_memory, SysMemData)}
     ].
 
+-spec mqtt_runtime_insights(state()) -> {map(), state()}.
+mqtt_runtime_insights(State0) ->
+    {MQTTRates, State} = update_mqtt_rates(State0),
+    MQTTRTInsights = MQTTRates#{num_topics => topic_count()},
+    {MQTTRTInsights, State}.
+
+-spec update_mqtt_rates(state()) -> {map(), state()}.
+update_mqtt_rates(
+    State = #state{
+        previous_metrics = PrevMetrics0,
+        report_interval = ReportInterval
+    }
+) when
+    is_integer(ReportInterval), ReportInterval > 0
+->
+    MetricsToCheck =
+        [
+            {messages_sent_rate, messages_sent, fun messages_sent/0},
+            {messages_received_rate, messages_received, fun messages_received/0}
+        ],
+    {Metrics, PrevMetrics} =
+        lists:foldl(
+            fun({RateKey, CountKey, Fun}, {Rates0, PrevMetrics1}) ->
+                NewCount = Fun(),
+                OldCount = maps:get(CountKey, PrevMetrics1, 0),
+                Rate = (NewCount - OldCount) / ReportInterval,
+                Rates = Rates0#{RateKey => Rate},
+                PrevMetrics2 = PrevMetrics1#{CountKey => NewCount},
+                {Rates, PrevMetrics2}
+            end,
+            {#{}, PrevMetrics0},
+            MetricsToCheck
+        ),
+    {Metrics, State#state{previous_metrics = PrevMetrics}};
+update_mqtt_rates(State) ->
+    {#{}, State}.
+
+advanced_mqtt_features() ->
+    AdvancedFeatures = emqx_modules:get_advanced_mqtt_features_in_use(),
+    maps:map(fun(_K, V) -> bool2int(V) end, AdvancedFeatures).
+
 bin(L) when is_list(L) ->
     list_to_binary(L);
 bin(A) when is_atom(A) ->
     atom_to_binary(A);
 bin(B) when is_binary(B) ->
     B.
+
+bool2int(true) -> 1;
+bool2int(false) -> 0.
+
+empty_state() ->
+    #state{
+        url = ?TELEMETRY_URL,
+        report_interval = timer:seconds(?REPORT_INTERVAL)
+    }.

+ 109 - 4
apps/emqx_modules/test/emqx_telemetry_SUITE.erl

@@ -29,11 +29,11 @@ all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
     snabbkaffe:fix_ct_logging(),
-    emqx_common_test_helpers:start_apps([emqx_modules]),
+    emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
     Config.
 
 end_per_suite(_Config) ->
-    emqx_common_test_helpers:stop_apps([emqx_modules]).
+    emqx_common_test_helpers:stop_apps([emqx_conf, emqx_modules]).
 
 init_per_testcase(t_get_telemetry, Config) ->
     DataDir = ?config(data_dir, Config),
@@ -66,10 +66,21 @@ init_per_testcase(t_get_telemetry, Config) ->
         end
     ),
     Config;
+init_per_testcase(t_advanced_mqtt_features, Config) ->
+    OldValues = emqx_modules:get_advanced_mqtt_features_in_use(),
+    emqx_modules:set_advanced_mqtt_features_in_use(#{
+        delayed => false,
+        topic_rewrite => false,
+        retained => false,
+        auto_subscribe => false
+    }),
+    [{old_values, OldValues} | Config];
 init_per_testcase(_Testcase, Config) ->
     TestPID = self(),
     ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
-    ok = meck:expect(httpc, request, fun(Method, URL, Headers, Body) ->
+    ok = meck:expect(httpc, request, fun(
+        Method, {URL, Headers, _ContentType, Body}, _HTTPOpts, _Opts
+    ) ->
         TestPID ! {request, Method, URL, Headers, Body}
     end),
     Config.
@@ -77,6 +88,9 @@ init_per_testcase(_Testcase, Config) ->
 end_per_testcase(t_get_telemetry, _Config) ->
     meck:unload([httpc, emqx_telemetry]),
     ok;
+end_per_testcase(t_advanced_mqtt_features, Config) ->
+    OldValues = ?config(old_values, Config),
+    emqx_modules:set_advanced_mqtt_features_in_use(OldValues);
 end_per_testcase(_Testcase, _Config) ->
     meck:unload([httpc]),
     ok.
@@ -134,6 +148,39 @@ t_get_telemetry(_Config) ->
     ?assert(0 =< get_value(num_cpus, VMSpecs)),
     ?assert(is_integer(get_value(total_memory, VMSpecs))),
     ?assert(0 =< get_value(total_memory, VMSpecs)),
+    MQTTRTInsights = get_value(mqtt_runtime_insights, TelemetryData),
+    ?assert(is_number(maps:get(messages_sent_rate, MQTTRTInsights))),
+    ?assert(is_number(maps:get(messages_received_rate, MQTTRTInsights))),
+    ?assert(is_integer(maps:get(num_topics, MQTTRTInsights))),
+    ok.
+
+t_advanced_mqtt_features(_) ->
+    {ok, TelemetryData} = emqx_telemetry:get_telemetry(),
+    AdvFeats = get_value(advanced_mqtt_features, TelemetryData),
+    ?assertEqual(
+        #{
+            retained => 0,
+            topic_rewrite => 0,
+            auto_subscribe => 0,
+            delayed => 0
+        },
+        AdvFeats
+    ),
+    lists:foreach(
+        fun(TelemetryKey) ->
+            EnabledFeats = emqx_modules:get_advanced_mqtt_features_in_use(),
+            emqx_modules:set_advanced_mqtt_features_in_use(EnabledFeats#{TelemetryKey => true}),
+            {ok, Data} = emqx_telemetry:get_telemetry(),
+            #{TelemetryKey := Value} = get_value(advanced_mqtt_features, Data),
+            ?assertEqual(1, Value, #{key => TelemetryKey})
+        end,
+        [
+            retained,
+            topic_rewrite,
+            auto_subscribe,
+            delayed
+        ]
+    ),
     ok.
 
 t_enable(_) ->
@@ -150,12 +197,70 @@ t_send_after_enable(_) ->
     ok = snabbkaffe:start_trace(),
     try
         ok = emqx_telemetry:enable(),
-        ?assertMatch({ok, _}, ?block_until(#{?snk_kind := telemetry_data_reported}, 2000, 100))
+        ?assertMatch({ok, _}, ?block_until(#{?snk_kind := telemetry_data_reported}, 2000, 100)),
+        receive
+            {request, post, _URL, _Headers, Body} ->
+                {ok, Decoded} = emqx_json:safe_decode(Body, [return_maps]),
+                ?assertMatch(
+                    #{
+                        <<"uuid">> := _,
+                        <<"messages_received">> := _,
+                        <<"messages_sent">> := _,
+                        <<"build_info">> := #{},
+                        <<"vm_specs">> :=
+                            #{
+                                <<"num_cpus">> := _,
+                                <<"total_memory">> := _
+                            },
+                        <<"mqtt_runtime_insights">> :=
+                            #{
+                                <<"messages_received_rate">> := _,
+                                <<"messages_sent_rate">> := _,
+                                <<"num_topics">> := _
+                            },
+                        <<"advanced_mqtt_features">> :=
+                            #{
+                                <<"retained">> := _,
+                                <<"topic_rewrite">> := _,
+                                <<"auto_subscribe">> := _,
+                                <<"delayed">> := _
+                            }
+                    },
+                    Decoded
+                )
+        after 2100 ->
+            exit(telemetry_not_reported)
+        end
     after
         ok = snabbkaffe:stop(),
         meck:unload([emqx_telemetry])
     end.
 
+t_mqtt_runtime_insights(_) ->
+    State0 = emqx_telemetry:empty_state(),
+    {MQTTRTInsights1, State1} = emqx_telemetry:mqtt_runtime_insights(State0),
+    ?assertEqual(
+        #{
+            messages_sent_rate => 0.0,
+            messages_received_rate => 0.0,
+            num_topics => 0
+        },
+        MQTTRTInsights1
+    ),
+    %% add some fake stats
+    emqx_metrics:set('messages.sent', 10_000_000_000),
+    emqx_metrics:set('messages.received', 20_000_000_000),
+    emqx_stats:setstat('topics.count', 30_000),
+    {MQTTRTInsights2, _State2} = emqx_telemetry:mqtt_runtime_insights(State1),
+    assert_approximate(MQTTRTInsights2, messages_sent_rate, "16.53"),
+    assert_approximate(MQTTRTInsights2, messages_received_rate, "33.07"),
+    ?assertEqual(30_000, maps:get(num_topics, MQTTRTInsights2)),
+    ok.
+
+assert_approximate(Map, Key, Expected) ->
+    Value = maps:get(Key, Map),
+    ?assertEqual(Expected, float_to_list(Value, [{decimals, 2}])).
+
 bin(L) when is_list(L) ->
     list_to_binary(L);
 bin(B) when is_binary(B) ->