Преглед изворни кода

Merge pull request #7515 from thalesmg/telemetry-revamp-part4

feat(telemetry): add gateway info to reported data
Thales Macedo Garitezi пре 3 година
родитељ
комит
8431aecf9b

+ 61 - 0
apps/emqx_gateway/src/emqx_gateway.erl

@@ -30,6 +30,9 @@
     list/0
 ]).
 
+%% APIs For `emqx_telemetry'
+-export([get_basic_usage_info/0]).
+
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
@@ -81,6 +84,64 @@ start(Name) ->
 stop(Name) ->
     emqx_gateway_sup:stop_gateway_insta(Name).
 
+%% @doc Expose basic info for `emqx_telemetry'.
+-spec get_basic_usage_info() ->
+    #{
+        authn => Authn,
+        num_clients => non_neg_integer(),
+        listeners => [
+            #{
+                type => atom(),
+                authn => Authn
+            }
+        ]
+    }
+when
+    Authn :: binary().
+get_basic_usage_info() ->
+    lists:foldl(
+        fun(GatewayInfo, Acc) ->
+            Config = maps:get(config, GatewayInfo, #{}),
+            GatewayType = maps:get(name, GatewayInfo),
+            GatewayAuthn = get_authn_type(Config),
+            Listeners = get_listeners(Config),
+            TabName = emqx_gateway_cm:tabname(chan, GatewayType),
+            NumClients = ets:info(TabName, size),
+            Acc#{
+                GatewayType => #{
+                    authn => GatewayAuthn,
+                    listeners => Listeners,
+                    num_clients => NumClients
+                }
+            }
+        end,
+        #{},
+        list()
+    ).
+
 %%--------------------------------------------------------------------
 %% Internal funcs
 %%--------------------------------------------------------------------
+
+get_authn_type(#{authentication := Authn = #{mechanism := Mechanism, backend := Backend}}) when
+    is_atom(Mechanism), is_atom(Backend)
+->
+    emqx_authentication_config:authenticator_id(Authn);
+get_authn_type(_) ->
+    <<"undefined">>.
+
+get_listeners(#{listeners := Listeners0}) when is_map(Listeners0) ->
+    Listeners = [
+        {ListenerType, Config}
+     || {ListenerType, Listeners1} <- maps:to_list(Listeners0),
+        {_Name, Config} <- maps:to_list(Listeners1)
+    ],
+    lists:map(
+        fun({ListenerType, ListenerConfig}) ->
+            ListenerAuthn = get_authn_type(ListenerConfig),
+            #{type => ListenerType, authn => ListenerAuthn}
+        end,
+        Listeners
+    );
+get_listeners(_) ->
+    [].

+ 82 - 1
apps/emqx_gateway/test/emqx_gateway_SUITE.erl

@@ -17,6 +17,7 @@
 -module(emqx_gateway_SUITE).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
 
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -37,7 +38,26 @@ init_per_suite(Conf) ->
     Conf.
 
 end_per_suite(_Conf) ->
-    emqx_common_test_helpers:stop_apps([emqx_gateway]).
+    emqx_common_test_helpers:stop_apps([emqx_gateway, emqx_authn]),
+    emqx_config:delete_override_conf_files(),
+    ok.
+
+init_per_testcase(t_get_basic_usage_info_2, Config) ->
+    DataDir = ?config(data_dir, Config),
+    emqx_common_test_helpers:stop_apps([emqx_gateway]),
+    ok = setup_fake_usage_data(DataDir),
+    Config;
+init_per_testcase(_TestCase, Config) ->
+    Config.
+
+end_per_testcase(t_get_basic_usage_info_2, _Config) ->
+    emqx_gateway_cm:unregister_channel(lwm2m, <<"client_id">>),
+    emqx_config:put([gateway], #{}),
+    emqx_common_test_helpers:stop_apps([emqx_gateway]),
+    emqx_common_test_helpers:start_apps([emqx_gateway]),
+    ok;
+end_per_testcase(_TestCase, _Config) ->
+    ok.
 
 %%--------------------------------------------------------------------
 %% cases
@@ -114,3 +134,64 @@ t_start_stop_update(_) ->
 
     {error, already_started} = emqx_gateway:start(?GWNAME),
     ok.
+
+t_get_basic_usage_info_empty(_Config) ->
+    ?assertEqual(
+        #{},
+        emqx_gateway:get_basic_usage_info()
+    ).
+
+t_get_basic_usage_info_1(_Config) ->
+    {ok, _} = emqx_gateway:load(?GWNAME, #{idle_timeout => 1000}),
+    ?assertEqual(
+        #{
+            mqttsn =>
+                #{
+                    authn => <<"undefined">>,
+                    listeners => [],
+                    num_clients => 0
+                }
+        },
+        emqx_gateway:get_basic_usage_info()
+    ).
+
+t_get_basic_usage_info_2(_Config) ->
+    ?assertEqual(
+        #{
+            lwm2m =>
+                #{
+                    authn => <<"password_based:redis">>,
+                    listeners =>
+                        [
+                            #{
+                                authn =>
+                                    <<"password_based:built_in_database">>,
+                                type => udp
+                            }
+                        ],
+                    num_clients => 1
+                }
+        },
+        emqx_gateway:get_basic_usage_info()
+    ).
+
+%%--------------------------------------------------------------------
+%% helper functions
+%%--------------------------------------------------------------------
+
+read_lwm2m_conf(DataDir) ->
+    ConfPath = filename:join([DataDir, "lwm2m.conf"]),
+    {ok, Conf} = file:read_file(ConfPath),
+    Conf.
+
+setup_fake_usage_data(Lwm2mDataDir) ->
+    XmlDir = emqx_common_test_helpers:deps_path(emqx_gateway, "src/lwm2m/lwm2m_xml"),
+    Lwm2mConf = read_lwm2m_conf(Lwm2mDataDir),
+    ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, Lwm2mConf),
+    emqx_config:put([gateway, lwm2m, xml_dir], XmlDir),
+    {ok, _} = application:ensure_all_started(emqx_gateway),
+    %% to simulate a connection
+    FakeConnInfo = #{conn_mod => fake_conn_mod},
+    FakeChanPid = self(),
+    ok = emqx_gateway_cm:register_channel(lwm2m, <<"client_id">>, FakeChanPid, FakeConnInfo),
+    ok.

+ 76 - 0
apps/emqx_gateway/test/emqx_gateway_SUITE_data/lwm2m.conf

@@ -0,0 +1,76 @@
+gateway.lwm2m {
+
+  ## How long time the connection will be disconnected if the
+  ## connection is established but no bytes received
+  idle_timeout = 30s
+
+  ## To control whether write statistics data into ETS table
+  ## for dashboard to read.
+  enable_stats = true
+
+  ## When publishing or subscribing, prefix all topics with a mountpoint string.
+  mountpoint = "lwm2m/${username}"
+
+  xml_dir = "etc/lwm2m_xml"
+
+  ##
+  ##
+  lifetime_min = 1s
+
+  lifetime_max = 86400s
+
+  qmode_time_window = 22s
+
+  auto_observe = false
+
+  ## always | contains_object_list
+  update_msg_publish_condition = contains_object_list
+
+
+  translators {
+    command  {
+      topic = "/dn/#"
+      qos = 0
+    }
+
+    response {
+      topic = "/up/resp"
+      qos = 0
+    }
+
+    notify {
+      topic = "/up/notify"
+      qos = 0
+    }
+
+    register {
+      topic = "/up/resp"
+      qos = 0
+    }
+
+    update {
+      topic = "/up/resp"
+      qos = 0
+    }
+  }
+
+  listeners.udp.default {
+      bind = 5783
+      authentication: {
+        mechanism: password_based,
+        backend: built_in_database
+      }
+  }
+
+  authentication {
+    mechanism: password_based,
+    backend: redis,
+    redis_type: single,
+    server: "localhost:6379",
+    cmd: "HMGET mqtt_user:${username} password_hash salt is_superuser",
+    password_hash_algorithm: {
+      name: plain,
+      salt_position: suffix
+    }
+  }
+}

+ 2 - 0
apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl

@@ -42,6 +42,8 @@
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Conf) ->
+    application:load(emqx),
+    emqx_config:delete_override_conf_files(),
     emqx_config:erase(gateway),
     emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
     emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authn, emqx_gateway]),

+ 2 - 1
apps/emqx_gateway/test/emqx_gateway_cm_SUITE.erl

@@ -43,7 +43,8 @@ init_per_suite(Conf) ->
 
 end_per_suite(_Conf) ->
     meck:unload(emqx_gateway_metrics),
-    emqx_common_test_helpers:stop_apps([]).
+    emqx_common_test_helpers:stop_apps([]),
+    emqx_config:delete_override_conf_files().
 
 init_per_testcase(_TestCase, Conf) ->
     process_flag(trap_exit, true),

+ 2 - 1
apps/emqx_gateway/test/emqx_gateway_cm_registry_SUITE.erl

@@ -39,7 +39,8 @@ init_per_suite(Conf) ->
     Conf.
 
 end_per_suite(_Conf) ->
-    emqx_common_test_helpers:stop_apps([]).
+    emqx_common_test_helpers:stop_apps([]),
+    emqx_config:delete_override_conf_files().
 
 init_per_testcase(_TestCase, Conf) ->
     {ok, Pid} = emqx_gateway_cm_registry:start_link(?GWNAME),

+ 2 - 1
apps/emqx_gateway/test/emqx_gateway_conf_SUITE.erl

@@ -42,7 +42,8 @@ init_per_suite(Conf) ->
     Conf.
 
 end_per_suite(_Conf) ->
-    emqx_common_test_helpers:stop_apps([emqx_gateway, emqx_authn, emqx_conf]).
+    emqx_common_test_helpers:stop_apps([emqx_gateway, emqx_authn, emqx_conf]),
+    emqx_config:delete_override_conf_files().
 
 init_per_testcase(_CaseName, Conf) ->
     _ = emqx_gateway_conf:unload_gateway(stomp),

+ 3 - 1
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -34,7 +34,9 @@ end_suite() ->
 
 end_suite(Apps) ->
     application:unload(emqx_management),
-    emqx_common_test_helpers:stop_apps(Apps ++ [emqx_dashboard]).
+    emqx_common_test_helpers:stop_apps(Apps ++ [emqx_dashboard]),
+    emqx_config:delete_override_conf_files(),
+    ok.
 
 set_special_configs(emqx_dashboard) ->
     Config = #{

+ 11 - 1
apps/emqx_modules/src/emqx_telemetry.erl

@@ -342,7 +342,8 @@ get_telemetry(State0 = #state{uuid = UUID}) ->
         {vm_specs, vm_specs()},
         {mqtt_runtime_insights, MQTTRTInsights},
         {advanced_mqtt_features, advanced_mqtt_features()},
-        {authn_authz, get_authn_authz_info()}
+        {authn_authz, get_authn_authz_info()},
+        {gateway, get_gateway_info()}
     ]}.
 
 report_telemetry(State0 = #state{url = URL}) ->
@@ -458,6 +459,15 @@ get_authn_authz_info() ->
         authz => AuthzTypes
     }.
 
+get_gateway_info() ->
+    try
+        emqx_gateway:get_basic_usage_info()
+    catch
+        %% if gateway is not available, for instance
+        _:_ ->
+            #{}
+    end.
+
 bin(L) when is_list(L) ->
     list_to_binary(L);
 bin(A) when is_atom(A) ->

+ 55 - 8
apps/emqx_modules/test/emqx_telemetry_SUITE.erl

@@ -81,6 +81,12 @@ init_per_testcase(t_get_telemetry, Config) ->
             {ok, Rendered}
         end
     ),
+    Lwm2mDataDir = emqx_common_test_helpers:deps_path(
+        emqx_gateway,
+        "test/emqx_gateway_SUITE_data"
+    ),
+    ok = emqx_gateway_SUITE:setup_fake_usage_data(Lwm2mDataDir),
+    {ok, _} = application:ensure_all_started(emqx_gateway),
     Config;
 init_per_testcase(t_advanced_mqtt_features, Config) ->
     OldValues = emqx_modules:get_advanced_mqtt_features_in_use(),
@@ -99,6 +105,16 @@ init_per_testcase(t_authn_authz_info, Config) ->
     create_authn('ws:default', redis),
     create_authz(postgresql),
     Config;
+init_per_testcase(t_enable, Config) ->
+    ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]),
+    ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end),
+    mock_httpc(),
+    Config;
+init_per_testcase(t_send_after_enable, Config) ->
+    ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]),
+    ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end),
+    mock_httpc(),
+    Config;
 init_per_testcase(_Testcase, Config) ->
     TestPID = self(),
     ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
@@ -111,6 +127,7 @@ init_per_testcase(_Testcase, Config) ->
 
 end_per_testcase(t_get_telemetry, _Config) ->
     meck:unload([httpc, emqx_telemetry]),
+    application:stop(emqx_gateway),
     ok;
 end_per_testcase(t_advanced_mqtt_features, Config) ->
     OldValues = ?config(old_values, Config),
@@ -128,6 +145,10 @@ end_per_testcase(t_authn_authz_info, _Config) ->
         ['mqtt:global', 'tcp:default', 'ws:default']
     ),
     ok;
+end_per_testcase(t_enable, _Config) ->
+    meck:unload([httpc, emqx_telemetry]);
+end_per_testcase(t_send_after_enable, _Config) ->
+    meck:unload([httpc, emqx_telemetry]);
 end_per_testcase(_Testcase, _Config) ->
     meck:unload([httpc]),
     ok.
@@ -190,6 +211,30 @@ t_get_telemetry(_Config) ->
     ?assert(is_number(maps:get(messages_received_rate, MQTTRTInsights))),
     ?assert(is_integer(maps:get(num_topics, MQTTRTInsights))),
     ?assert(is_map(get_value(authn_authz, TelemetryData))),
+    GatewayInfo = get_value(gateway, TelemetryData),
+    ?assert(is_map(GatewayInfo)),
+    lists:foreach(
+        fun({GatewayType, GatewayData}) ->
+            ?assertMatch(
+                #{
+                    authn := GwAuthn,
+                    num_clients := NClients,
+                    listeners := Ls
+                } when
+                    is_binary(GwAuthn) andalso
+                        is_integer(NClients) andalso
+                        is_list(Ls),
+                GatewayData,
+                #{gateway_type => GatewayType}
+            ),
+            ListenersData = maps:get(listeners, GatewayData),
+            lists:foreach(
+                fun(L) -> assert_gateway_listener_shape(L, GatewayType) end,
+                ListenersData
+            )
+        end,
+        maps:to_list(GatewayInfo)
+    ),
     ok.
 
 t_advanced_mqtt_features(_) ->
@@ -238,15 +283,10 @@ t_authn_authz_info(_) ->
     ).
 
 t_enable(_) ->
-    ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]),
-    ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end),
     ok = emqx_telemetry:enable(),
-    ok = emqx_telemetry:disable(),
-    meck:unload([emqx_telemetry]).
+    ok = emqx_telemetry:disable().
 
 t_send_after_enable(_) ->
-    ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]),
-    ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end),
     ok = emqx_telemetry:disable(),
     ok = snabbkaffe:start_trace(),
     try
@@ -286,8 +326,7 @@ t_send_after_enable(_) ->
             exit(telemetry_not_reported)
         end
     after
-        ok = snabbkaffe:stop(),
-        meck:unload([emqx_telemetry])
+        ok = snabbkaffe:stop()
     end.
 
 t_mqtt_runtime_insights(_) ->
@@ -380,6 +419,14 @@ create_authz(postgresql) ->
         }
     ).
 
+assert_gateway_listener_shape(ListenerData, GatewayType) ->
+    ?assertMatch(
+        #{type := LType, authn := LAuthn} when
+            is_atom(LType) andalso is_binary(LAuthn),
+        ListenerData,
+        #{gateway_type => GatewayType}
+    ).
+
 set_special_configs(emqx_authz) ->
     {ok, _} = emqx:update_config([authorization, cache, enable], false),
     {ok, _} = emqx:update_config([authorization, no_match], deny),

+ 1 - 0
apps/emqx_plugins/test/emqx_plugins_tests.erl

@@ -18,6 +18,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 
+-compile(nowarn_export_all).
 -compile(export_all).
 
 ensure_configured_test_todo() ->