Просмотр исходного кода

Merge pull request #7666 from lafirest/fmt/delayed_slow_subs

Fmt/delayed slow subs
lafirest 3 лет назад
Родитель
Сommit
0637c5bbbd

+ 6 - 5
apps/emqx_slow_subs/include/emqx_slow_subs.hrl

@@ -23,12 +23,13 @@
 
 -define(MAX_SIZE, 1000).
 
--record(top_k, { index :: topk_index()
-               , last_update_time :: pos_integer()
-               , extra = []
-               }).
+-record(top_k, {
+    index :: topk_index(),
+    last_update_time :: pos_integer(),
+    extra = []
+}).
 
--record(index_tab, { index :: index()}).
+-record(index_tab, {index :: index()}).
 
 -type top_k() :: #top_k{}.
 -type index_tab() :: #index_tab{}.

+ 3 - 2
apps/emqx_slow_subs/rebar.config

@@ -1,4 +1,5 @@
 %% -*- mode: erlang -*-
 
-{deps, [ {emqx, {path, "../emqx"}}
-       ]}.
+{deps, [{emqx, {path, "../emqx"}}]}.
+
+{project_plugins, [erlfmt]}.

+ 63 - 56
apps/emqx_slow_subs/src/emqx_slow_subs_api.erl

@@ -41,48 +41,52 @@ paths() -> ["/slow_subscriptions", "/slow_subscriptions/settings"].
 
 schema(("/slow_subscriptions")) ->
     #{
-      'operationId' => slow_subs,
-      delete => #{tags => [<<"slow subs">>],
-                  description => <<"Clear current data and re count slow topic">>,
-                  parameters => [],
-                  'requestBody' => [],
-                  responses => #{204 => <<"No Content">>}
-                 },
-      get => #{tags => [<<"slow subs">>],
-               description => <<"Get slow topics statistics record data">>,
-               parameters => [ {page, mk(pos_integer(), #{in => query})}
-                             , {limit, mk(pos_integer(), #{in => query})}
-                             ],
-               'requestBody' => [],
-               responses => #{200 => [{data, mk(hoconsc:array(ref(record)), #{})}]}
-              }
-     };
-
+        'operationId' => slow_subs,
+        delete => #{
+            tags => [<<"slow subs">>],
+            description => <<"Clear current data and re count slow topic">>,
+            parameters => [],
+            'requestBody' => [],
+            responses => #{204 => <<"No Content">>}
+        },
+        get => #{
+            tags => [<<"slow subs">>],
+            description => <<"Get slow topics statistics record data">>,
+            parameters => [
+                {page, mk(pos_integer(), #{in => query})},
+                {limit, mk(pos_integer(), #{in => query})}
+            ],
+            'requestBody' => [],
+            responses => #{200 => [{data, mk(hoconsc:array(ref(record)), #{})}]}
+        }
+    };
 schema("/slow_subscriptions/settings") ->
-    #{'operationId' => settings,
-      get => #{tags => [<<"slow subs">>],
-               description => <<"Get slow subs settings">>,
-               responses => #{200 => conf_schema()}
-              },
-      put => #{tags => [<<"slow subs">>],
-               description => <<"Update slow subs settings">>,
-               'requestBody' => conf_schema(),
-               responses => #{200 => conf_schema()}
-              }
-     }.
+    #{
+        'operationId' => settings,
+        get => #{
+            tags => [<<"slow subs">>],
+            description => <<"Get slow subs settings">>,
+            responses => #{200 => conf_schema()}
+        },
+        put => #{
+            tags => [<<"slow subs">>],
+            description => <<"Update slow subs settings">>,
+            'requestBody' => conf_schema(),
+            responses => #{200 => conf_schema()}
+        }
+    }.
 
 fields(record) ->
-    [ {clientid,
-       mk(string(), #{desc => <<"the clientid">>})},
-      {node,
-       mk(string(), #{desc => <<"the node">>})},
-      {topic,
-       mk(string(), #{desc => <<"the topic">>})},
-      {timespan,
-       mk(integer(),
-          #{desc => <<"timespan for message transmission">>})},
-      {last_update_time,
-       mk(integer(), #{desc => <<"the timestamp of last update">>})}
+    [
+        {clientid, mk(string(), #{desc => <<"the clientid">>})},
+        {node, mk(string(), #{desc => <<"the node">>})},
+        {topic, mk(string(), #{desc => <<"the topic">>})},
+        {timespan,
+            mk(
+                integer(),
+                #{desc => <<"timespan for message transmission">>}
+            )},
+        {last_update_time, mk(integer(), #{desc => <<"the timestamp of last update">>})}
     ].
 
 conf_schema() ->
@@ -92,17 +96,17 @@ conf_schema() ->
 slow_subs(delete, _) ->
     _ = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:clear_history(Nodes) end),
     {204};
-
 slow_subs(get, _) ->
     NodeRankL = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:get_history(Nodes) end),
-    Fun = fun({ok, L}, Acc) -> L ++ Acc;
-             (_, Acc) -> Acc
-          end,
+    Fun = fun
+        ({ok, L}, Acc) -> L ++ Acc;
+        (_, Acc) -> Acc
+    end,
     RankL = lists:foldl(Fun, [], NodeRankL),
 
     SortFun = fun(#{timespan := A}, #{timespan := B}) ->
-                      A > B
-              end,
+        A > B
+    end,
 
     SortedL = lists:sort(SortFun, RankL),
     SortedL2 = lists:sublist(SortedL, ?MAX_SIZE),
@@ -112,22 +116,25 @@ slow_subs(get, _) ->
 get_history() ->
     Node = node(),
     RankL = ets:tab2list(?TOPK_TAB),
-    ConvFun = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)),
-                         last_update_time = LastUpdateTime
-                        }) ->
-                      #{ clientid => ClientId
-                       , node => Node
-                       , topic => Topic
-                       , timespan => TimeSpan
-                       , last_update_time => LastUpdateTime
-                       }
-              end,
+    ConvFun = fun(
+        #top_k{
+            index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)),
+            last_update_time = LastUpdateTime
+        }
+    ) ->
+        #{
+            clientid => ClientId,
+            node => Node,
+            topic => Topic,
+            timespan => TimeSpan,
+            last_update_time => LastUpdateTime
+        }
+    end,
 
     lists:map(ConvFun, RankL).
 
 settings(get, _) ->
     {200, emqx:get_raw_config([slow_subs], #{})};
-
 settings(put, #{body := Body}) ->
     case emqx_slow_subs:update_settings(Body) of
         {ok, #{config := NewConf}} ->

+ 4 - 3
apps/emqx_slow_subs/src/emqx_slow_subs_app.erl

@@ -18,9 +18,10 @@
 
 -behaviour(application).
 
--export([ start/2
-        , stop/1
-        ]).
+-export([
+    start/2,
+    stop/1
+]).
 
 start(_Type, _Args) ->
     {ok, Sup} = emqx_slow_subs_sup:start_link(),

+ 26 - 17
apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl

@@ -9,23 +9,32 @@ namespace() -> "slow_subs".
 roots() -> ["slow_subs"].
 
 fields("slow_subs") ->
-    [ {enable, sc(boolean(), false, "Enable this feature.")}
-    , {threshold,
-       sc(emqx_schema:duration_ms(),
-          "500ms",
-          "The latency threshold for statistics, the minimum value is 100ms.")}
-    , {expire_interval,
-       sc(emqx_schema:duration_ms(),
-          "300s",
-          "The eviction time of the record, which in the statistics record table.")}
-    , {top_k_num,
-       sc(pos_integer(),
-          10,
-          "The maximum number of records in the slow subscription statistics record table.")}
-    , {stats_type,
-       sc(hoconsc:union([whole, internal, response]),
-          whole,
-          "The method to calculate the latency.")}
+    [
+        {enable, sc(boolean(), false, "Enable this feature.")},
+        {threshold,
+            sc(
+                emqx_schema:duration_ms(),
+                "500ms",
+                "The latency threshold for statistics, the minimum value is 100ms."
+            )},
+        {expire_interval,
+            sc(
+                emqx_schema:duration_ms(),
+                "300s",
+                "The eviction time of the record, which in the statistics record table."
+            )},
+        {top_k_num,
+            sc(
+                pos_integer(),
+                10,
+                "The maximum number of records in the slow subscription statistics record table."
+            )},
+        {stats_type,
+            sc(
+                hoconsc:union([whole, internal, response]),
+                whole,
+                "The method to calculate the latency."
+            )}
     ].
 
 desc("slow_subs") ->

+ 11 - 7
apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl

@@ -27,10 +27,14 @@ start_link() ->
 
 init([]) ->
     emqx_slow_subs:init_tab(),
-    {ok, {{one_for_one, 10, 3600},
-          [#{id       => st_statistics,
-             start    => {emqx_slow_subs, start_link, []},
-             restart  => permanent,
-             shutdown => 5000,
-             type     => worker,
-             modules  => [emqx_slow_subs]}]}}.
+    {ok,
+        {{one_for_one, 10, 3600}, [
+            #{
+                id => st_statistics,
+                start => {emqx_slow_subs, start_link, []},
+                restart => permanent,
+                shutdown => 5000,
+                type => worker,
+                modules => [emqx_slow_subs]
+            }
+        ]}}.

+ 44 - 33
apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl

@@ -27,13 +27,17 @@
 -define(NOW, erlang:system_time(millisecond)).
 -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
 
--define(BASE_CONF, <<"""
-slow_subs {
-    enable = true
-	top_k_num = 5,
-    expire_interval = 5m
-    stats_type = whole
-    }""">>).
+-define(BASE_CONF, <<
+    ""
+    "\n"
+    "slow_subs {\n"
+    "    enable = true\n"
+    "	top_k_num = 5,\n"
+    "    expire_interval = 5m\n"
+    "    stats_type = whole\n"
+    "    }"
+    ""
+>>).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -46,7 +50,6 @@ init_per_suite(Config) ->
     meck:expect(emqx_alarm, activate, 3, ok),
     meck:expect(emqx_alarm, deactivate, 3, ok),
 
-
     ok = emqx_common_test_helpers:load_config(emqx_slow_subs_schema, ?BASE_CONF),
     emqx_common_test_helpers:start_apps([emqx_slow_subs]),
     Config.
@@ -64,13 +67,13 @@ init_per_testcase(t_expire, Config) ->
     Cfg = emqx_config:get([slow_subs]),
     emqx_slow_subs:update_settings(Cfg#{expire_interval := 1500}),
     Config;
-
 init_per_testcase(_, Config) ->
     Config.
 
 end_per_testcase(_, _) ->
     case erlang:whereis(node()) of
-        undefined -> ok;
+        undefined ->
+            ok;
         P ->
             erlang:unlink(P),
             erlang:exit(P, kill)
@@ -88,21 +91,25 @@ t_pub(_) ->
     Now = ?NOW,
     %% publish
 
-    lists:foreach(fun(I) ->
-                          Topic = list_to_binary(io_lib:format("/test1/~p", [I])),
-                          Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>),
-                          emqx:publish(Msg#message{timestamp = Now - 500}),
-                          timer:sleep(100)
-                  end,
-                  lists:seq(1, 10)),
-
-    lists:foreach(fun(I) ->
-                          Topic = list_to_binary(io_lib:format("/test2/~p", [I])),
-                          Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>),
-                          emqx:publish(Msg#message{timestamp = Now - 500}),
-                          timer:sleep(100)
-                  end,
-                  lists:seq(1, 10)),
+    lists:foreach(
+        fun(I) ->
+            Topic = list_to_binary(io_lib:format("/test1/~p", [I])),
+            Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>),
+            emqx:publish(Msg#message{timestamp = Now - 500}),
+            timer:sleep(100)
+        end,
+        lists:seq(1, 10)
+    ),
+
+    lists:foreach(
+        fun(I) ->
+            Topic = list_to_binary(io_lib:format("/test2/~p", [I])),
+            Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>),
+            emqx:publish(Msg#message{timestamp = Now - 500}),
+            timer:sleep(100)
+        end,
+        lists:seq(1, 10)
+    ),
 
     timer:sleep(1000),
     Size = ets:info(?TOPK_TAB, size),
@@ -114,10 +121,12 @@ t_pub(_) ->
 t_expire(_) ->
     Now = ?NOW,
     Each = fun(I) ->
-                   ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),
-                   ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)),
-                                                last_update_time = Now - timer:minutes(5)})
-           end,
+        ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),
+        ets:insert(?TOPK_TAB, #top_k{
+            index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)),
+            last_update_time = Now - timer:minutes(5)
+        })
+    end,
 
     lists:foreach(Each, lists:seq(1, 5)),
 
@@ -130,10 +139,12 @@ start_client(Subs) ->
     [spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)].
 
 client(I, Subs) ->
-    {ok, C} = emqtt:start_link([{host,      "localhost"},
-                                {clientid,  io_lib:format("slow_subs_~p", [I])},
-                                {username,  <<"plain">>},
-                                {password,  <<"plain">>}]),
+    {ok, C} = emqtt:start_link([
+        {host, "localhost"},
+        {clientid, io_lib:format("slow_subs_~p", [I])},
+        {username, <<"plain">>},
+        {password, <<"plain">>}
+    ]),
     {ok, _} = emqtt:connect(C),
 
     Len = erlang:length(Subs),

+ 77 - 50
apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl

@@ -34,15 +34,18 @@
 -define(NOW, erlang:system_time(millisecond)).
 -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
 
--define(CONF_DEFAULT, <<"""
-slow_subs
-{
- enable = true
- top_k_num = 5,
- expire_interval = 60000
- stats_type = whole
-}""">>).
-
+-define(CONF_DEFAULT, <<
+    ""
+    "\n"
+    "slow_subs\n"
+    "{\n"
+    " enable = true\n"
+    " top_k_num = 5,\n"
+    " expire_interval = 60000\n"
+    " stats_type = whole\n"
+    "}"
+    ""
+>>).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -79,7 +82,8 @@ init_per_testcase(_, Config) ->
 end_per_testcase(_, Config) ->
     application:stop(emqx_slow_subs),
     case erlang:whereis(node()) of
-        undefined -> ok;
+        undefined ->
+            ok;
         P ->
             erlang:unlink(P),
             erlang:exit(P, kill)
@@ -89,50 +93,70 @@ end_per_testcase(_, Config) ->
 t_get_history(_) ->
     Now = ?NOW,
     Each = fun(I) ->
-                   ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),
-                   ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)),
-                                                last_update_time = Now})
-           end,
+        ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),
+        ets:insert(?TOPK_TAB, #top_k{
+            index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)),
+            last_update_time = Now
+        })
+    end,
 
     lists:foreach(Each, lists:seq(1, 5)),
 
-    {ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "page=1&limit=10",
-                             auth_header_()),
+    {ok, Data} = request_api(
+        get,
+        api_path(["slow_subscriptions"]),
+        "page=1&limit=10",
+        auth_header_()
+    ),
     #{<<"data">> := [First | _]} = emqx_json:decode(Data, [return_maps]),
 
-    ?assertMatch(#{<<"clientid">> := <<"test_5">>,
-                   <<"topic">> := <<"topic">>,
-                   <<"last_update_time">> := Now,
-                   <<"node">> := _,
-                   <<"timespan">> := _}, First).
+    ?assertMatch(
+        #{
+            <<"clientid">> := <<"test_5">>,
+            <<"topic">> := <<"topic">>,
+            <<"last_update_time">> := Now,
+            <<"node">> := _,
+            <<"timespan">> := _
+        },
+        First
+    ).
 
 t_clear(_) ->
-    ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(<<"clientid">>, <<"topic">>)),
-                                 last_update_time = ?NOW}),
-
-    {ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [],
-                          auth_header_()),
+    ets:insert(?TOPK_TAB, #top_k{
+        index = ?TOPK_INDEX(1, ?ID(<<"clientid">>, <<"topic">>)),
+        last_update_time = ?NOW
+    }),
+
+    {ok, _} = request_api(
+        delete,
+        api_path(["slow_subscriptions"]),
+        [],
+        auth_header_()
+    ),
 
     ?assertEqual(0, ets:info(?TOPK_TAB, size)).
 
 t_settting(_) ->
     Conf = emqx:get_config([slow_subs]),
     Conf2 = Conf#{stats_type => internal},
-    {ok, Data} = request_api(put,
-                             api_path(["slow_subscriptions", "settings"]),
-                             [],
-                             auth_header_(),
-                             Conf2),
+    {ok, Data} = request_api(
+        put,
+        api_path(["slow_subscriptions", "settings"]),
+        [],
+        auth_header_(),
+        Conf2
+    ),
 
     Return = decode_json(Data),
 
     ?assertEqual(Conf2#{stats_type := <<"internal">>}, Return),
 
-    {ok, GetData} = request_api(get,
-                                api_path(["slow_subscriptions", "settings"]),
-                                [],
-                                auth_header_()
-                               ),
+    {ok, GetData} = request_api(
+        get,
+        api_path(["slow_subscriptions", "settings"]),
+        [],
+        auth_header_()
+    ),
 
     timer:sleep(1000),
 
@@ -151,25 +175,28 @@ request_api(Method, Url, QueryParams, Auth) ->
     request_api(Method, Url, QueryParams, Auth, []).
 
 request_api(Method, Url, QueryParams, Auth, []) ->
-    NewUrl = case QueryParams of
-                 "" -> Url;
-                 _ -> Url ++ "?" ++ QueryParams
-             end,
+    NewUrl =
+        case QueryParams of
+            "" -> Url;
+            _ -> Url ++ "?" ++ QueryParams
+        end,
     do_request_api(Method, {NewUrl, [Auth]});
 request_api(Method, Url, QueryParams, Auth, Body) ->
-    NewUrl = case QueryParams of
-                 "" -> Url;
-                 _ -> Url ++ "?" ++ QueryParams
-             end,
+    NewUrl =
+        case QueryParams of
+            "" -> Url;
+            _ -> Url ++ "?" ++ QueryParams
+        end,
     do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}).
 
-do_request_api(Method, Request)->
+do_request_api(Method, Request) ->
     ct:pal("Method: ~p, Request: ~p", [Method, Request]),
     case httpc:request(Method, Request, [], [{body_format, binary}]) of
         {error, socket_closed_remotely} ->
             {error, socket_closed_remotely};
-        {ok, {{"HTTP/1.1", Code, _}, _, Return} }
-          when Code =:= 200 orelse Code =:= 204 ->
+        {ok, {{"HTTP/1.1", Code, _}, _, Return}} when
+            Code =:= 200 orelse Code =:= 204
+        ->
             {ok, Return};
         {ok, {Reason, _, _}} ->
             {error, Reason}
@@ -181,8 +208,8 @@ auth_header_() ->
     auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
 
 auth_header_(User, Pass) ->
-    Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
-    {"Authorization","Basic " ++ Encoded}.
+    Encoded = base64:encode_to_string(lists:append([User, ":", Pass])),
+    {"Authorization", "Basic " ++ Encoded}.
 
-api_path(Parts)->
+api_path(Parts) ->
     ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).

+ 2 - 0
git-blame-ignore-revs

@@ -23,3 +23,5 @@ acb3544d4b112121b5d9414237d2af7860ccc2a3
 f1acfece6b79ed69b491da03783a7adaa7627b96
 # reformat apps/emqx_management
 aa7807baebfa5d8678025e43f386bcd9b3259d6a
+# reformat apps/emqx_slow_subs
+83511f8a4c1570a2c89d9c6c5b6f462520199ed8

+ 1 - 1
scripts/check-format.sh

@@ -12,7 +12,7 @@ APPS+=( 'apps/emqx' 'apps/emqx_modules' 'apps/emqx_gateway')
 APPS+=( 'apps/emqx_authn' 'apps/emqx_authz' )
 APPS+=( 'lib-ee/emqx_enterprise_conf' 'lib-ee/emqx_license' )
 APPS+=( 'apps/emqx_exhook')
-APPS+=( 'apps/emqx_retainer')
+APPS+=( 'apps/emqx_retainer' 'apps/emqx_slow_subs')
 
 for app in "${APPS[@]}"; do
     echo "$app ..."