Przeglądaj źródła

Merge pull request #13910 from zmstone/1001-cache-local-connections-count

perf(license): cache local connections count
zmstone 1 rok temu
rodzic
commit
18f46ec719

+ 3 - 8
apps/emqx_gateway/src/emqx_gateway_cm_registry.erl

@@ -99,16 +99,11 @@ get_connected_client_count() ->
     Gatewyas = emqx_gateway_utils:find_gateway_definitions(),
     Fun = fun(#{name := Name}, Acc) ->
         Tab = tabname(Name),
-        case ets:whereis(Tab) of
+        case ets:info(Tab, size) of
             undefined ->
                 Acc;
-            _ ->
-                case ets:info(Tab, size) of
-                    undefined ->
-                        Acc;
-                    Size ->
-                        Acc + Size
-                end
+            Size ->
+                Acc + Size
         end
     end,
     lists:foldl(Fun, 0, Gatewyas).

+ 16 - 0
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -704,6 +704,12 @@ default_subopts() ->
 
 -spec find_gateway_definitions() -> list(gateway_def()).
 find_gateway_definitions() ->
+    read_pt_populate_if_missing(
+        emqx_gateways,
+        fun do_find_gateway_definitions/0
+    ).
+
+do_find_gateway_definitions() ->
     lists:flatmap(
         fun(App) ->
             lists:flatmap(fun gateways/1, find_attrs(App, gateway))
@@ -711,6 +717,16 @@ find_gateway_definitions() ->
         ?GATEWAYS
     ).
 
+read_pt_populate_if_missing(Key, Fn) ->
+    case persistent_term:get(Key, no_value) of
+        no_value ->
+            Value = Fn(),
+            _ = persistent_term:put(Key, {value, Value}),
+            Value;
+        {value, Value} ->
+            Value
+    end.
+
 -spec find_gateway_definition(atom()) -> {ok, map()} | {error, term()}.
 find_gateway_definition(Name) ->
     find_gateway_definition(Name, ?GATEWAYS).

+ 1 - 1
apps/emqx_license/src/emqx_license.erl

@@ -172,7 +172,7 @@ do_update(NewConf, _PrevConf) ->
     do_update({key, NewKey}, NewConf).
 
 check_max_clients_exceeded(MaxClients) ->
-    emqx_license_resources:connection_count() > MaxClients * 1.1.
+    emqx_license_resources:cached_connection_count() > MaxClients * 1.1.
 
 read_license(#{key := Content}) ->
     emqx_license_parser:parse(Content).

+ 11 - 14
apps/emqx_license/src/emqx_license_resources.erl

@@ -13,8 +13,10 @@
 -export([
     start_link/0,
     start_link/1,
+    %% RPC
     local_connection_count/0,
-    connection_count/0
+    %% hot call
+    cached_connection_count/0
 ]).
 
 %% gen_server callbacks
@@ -54,11 +56,6 @@ start_link(CheckInterval) when is_integer(CheckInterval) ->
 local_connection_count() ->
     emqx_cm:get_connected_client_count().
 
--spec connection_count() -> non_neg_integer().
-connection_count() ->
-    local_connection_count() + cached_remote_connection_count() +
-        emqx_gateway_cm_registry:get_connected_client_count().
-
 %%------------------------------------------------------------------------------
 %% gen_server callbacks
 %%------------------------------------------------------------------------------
@@ -93,7 +90,7 @@ connection_quota_early_alarm() ->
     connection_quota_early_alarm(emqx_license_checker:limits()).
 
 connection_quota_early_alarm({ok, #{max_connections := Max}}) when is_integer(Max) ->
-    Count = connection_count(),
+    Count = cached_connection_count(),
     Low = emqx_conf:get([license, connection_low_watermark], 0.75),
     High = emqx_conf:get([license, connection_high_watermark], 0.80),
     Count > Max * High andalso
@@ -108,16 +105,16 @@ connection_quota_early_alarm({ok, #{max_connections := Max}}) when is_integer(Ma
 connection_quota_early_alarm(_Limits) ->
     ok.
 
-cached_remote_connection_count() ->
-    try ets:lookup(?MODULE, remote_connection_count) of
-        [{remote_connection_count, N}] -> N;
+cached_connection_count() ->
+    try ets:lookup(?MODULE, total_connection_count) of
+        [{total_connection_count, N}] -> N;
         _ -> 0
     catch
         error:badarg -> 0
     end.
 
 update_resources() ->
-    ets:insert(?MODULE, {remote_connection_count, remote_connection_count()}).
+    ets:insert(?MODULE, {total_connection_count, total_connection_count()}).
 
 ensure_timer(#{check_peer_interval := CheckInterval} = State) ->
     _ =
@@ -127,8 +124,8 @@ ensure_timer(#{check_peer_interval := CheckInterval} = State) ->
         end,
     State#{timer => erlang:send_after(CheckInterval, self(), update_resources)}.
 
-remote_connection_count() ->
-    Nodes = mria:running_nodes() -- [node()],
+total_connection_count() ->
+    Nodes = mria:running_nodes(),
     Results = emqx_license_proto_v2:remote_connection_counts(Nodes),
     Counts = [Count || {ok, Count} <- Results],
-    lists:sum(Counts).
+    lists:sum(Counts) + emqx_gateway_cm_registry:get_connected_client_count().

+ 28 - 13
apps/emqx_license/test/emqx_license_SUITE.erl

@@ -74,20 +74,22 @@ t_check_exceeded(_Config) ->
             "10"
         ]
     ),
-    #{} = emqx_license_checker:update(License),
+    #{} = update(License),
 
-    ok = lists:foreach(
+    Pids = lists:map(
         fun(_) ->
             {ok, C} = emqtt:start_link(),
-            {ok, _} = emqtt:connect(C)
+            {ok, _} = emqtt:connect(C),
+            C
         end,
         lists:seq(1, 12)
     ),
-
+    sync_cache(),
     ?assertEqual(
         {stop, {error, ?RC_QUOTA_EXCEEDED}},
         emqx_license:check(#{}, #{})
-    ).
+    ),
+    ok = lists:foreach(fun(Pid) -> emqtt:stop(Pid) end, Pids).
 
 t_check_ok(_Config) ->
     {_, License} = mk_license(
@@ -103,20 +105,21 @@ t_check_ok(_Config) ->
             "10"
         ]
     ),
-    #{} = emqx_license_checker:update(License),
+    #{} = update(License),
 
-    ok = lists:foreach(
-        fun(_) ->
-            {ok, C} = emqtt:start_link(),
-            {ok, _} = emqtt:connect(C)
+    Pids = lists:map(
+        fun(I) ->
+            {ok, C} = emqtt:start_link([{proto_ver, v5}]),
+            ?assertMatch({I, {ok, _}}, {I, emqtt:connect(C)}),
+            C
         end,
         lists:seq(1, 11)
     ),
-
     ?assertEqual(
         {ok, #{}},
         emqx_license:check(#{}, #{})
-    ).
+    ),
+    ok = lists:foreach(fun(Pid) -> emqtt:stop(Pid) end, Pids).
 
 t_check_expired(_Config) ->
     {_, License} = mk_license(
@@ -135,7 +138,7 @@ t_check_expired(_Config) ->
             "10"
         ]
     ),
-    #{} = emqx_license_checker:update(License),
+    #{} = update(License),
 
     ?assertEqual(
         {stop, {error, ?RC_QUOTA_EXCEEDED}},
@@ -190,3 +193,15 @@ mk_license(Fields) ->
         emqx_license_test_lib:public_key_pem()
     ),
     {EncodedLicense, License}.
+
+update(License) ->
+    Result = emqx_license_checker:update(License),
+    sync_cache(),
+    Result.
+
+sync_cache() ->
+    %% force refresh the cache
+    _ = whereis(emqx_license_resources) ! update_resources,
+    %% force sync with the process
+    _ = sys:get_state(whereis(emqx_license_resources)),
+    ok.

+ 4 - 4
apps/emqx_license/test/emqx_license_resources_SUITE.erl

@@ -42,7 +42,7 @@ t_connection_count(_Config) ->
                 #{?snk_kind := emqx_license_resources_updated},
                 1000
             ),
-            emqx_license_resources:connection_count()
+            emqx_license_resources:cached_connection_count()
         end,
         fun(ConnCount, Trace) ->
             ?assertEqual(0, ConnCount),
@@ -57,8 +57,8 @@ t_connection_count(_Config) ->
     meck:expect(
         emqx_license_proto_v2,
         remote_connection_counts,
-        fun(_Nodes) ->
-            [{ok, 5}, {error, some_error}]
+        fun(Nodes) ->
+            [{ok, 5}, {error, some_error}] ++ meck:passthrough([Nodes])
         end
     ),
 
@@ -69,7 +69,7 @@ t_connection_count(_Config) ->
                 #{?snk_kind := emqx_license_resources_updated},
                 1000
             ),
-            emqx_license_resources:connection_count()
+            emqx_license_resources:cached_connection_count()
         end,
         fun(ConnCount, _Trace) ->
             ?assertEqual(15, ConnCount)

+ 1 - 0
changes/ee/perf-13910.en.md

@@ -0,0 +1 @@
+Performance enhancement for Enterprise edition license check.