Просмотр исходного кода

Merge pull request #10961 from HJianBo/support-infinity-max-conns-in-gateway

Adds support for unlimited max connections for gateway listeners
zhongwencool 2 лет назад
Родитель
Сommit
61bbe19eba

+ 22 - 3
apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl

@@ -99,9 +99,7 @@ t_monitor_current_api_live_connections(_) ->
     ok = emqtt:disconnect(C),
     {ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, ClientId1}]),
     {ok, _} = emqtt:connect(C1),
-    %% waiting for emqx_stats ticker
-    timer:sleep(1500),
-    _ = emqx_dashboard_monitor:current_rate(),
+    ok = waiting_emqx_stats_and_monitor_update('live_connections.max'),
     {ok, Rate} = request(["monitor_current"]),
     ?assertEqual(1, maps:get(<<"live_connections">>, Rate)),
     ?assertEqual(2, maps:get(<<"connections">>, Rate)),
@@ -181,3 +179,24 @@ wait_new_monitor(OldMonitor, Count) ->
             timer:sleep(100),
             wait_new_monitor(OldMonitor, Count - 1)
     end.
+
+waiting_emqx_stats_and_monitor_update(WaitKey) ->
+    Self = self(),
+    meck:new(emqx_stats, [passthrough]),
+    meck:expect(
+        emqx_stats,
+        setstat,
+        fun(Stat, MaxStat, Val) ->
+            (Stat =:= WaitKey orelse MaxStat =:= WaitKey) andalso (Self ! updated),
+            meck:passthrough([Stat, MaxStat, Val])
+        end
+    ),
+    receive
+        updated -> ok
+    after 5000 ->
+        error(waiting_emqx_stats_update_timeout)
+    end,
+    meck:unload([emqx_stats]),
+    %% manually call monitor update
+    _ = emqx_dashboard_monitor:current_rate(),
+    ok.

+ 17 - 5
apps/emqx_gateway/src/emqx_gateway_api_listeners.erl

@@ -304,8 +304,7 @@ do_listeners_cluster_status(Listeners) ->
                     status => #{
                         running => Running,
                         current_connections => Curr,
-                        %% XXX: Since it is taken from raw-conf, it is possible a string
-                        max_connections => int(Max)
+                        max_connections => ensure_integer_or_infinity(Max)
                     }
                 }
             }
@@ -314,10 +313,15 @@ do_listeners_cluster_status(Listeners) ->
         Listeners
     ).
 
-int(B) when is_binary(B) ->
+ensure_integer_or_infinity(infinity) ->
+    infinity;
+ensure_integer_or_infinity(<<"infinity">>) ->
+    infinity;
+ensure_integer_or_infinity(B) when is_binary(B) ->
     binary_to_integer(B);
-int(I) when is_integer(I) ->
+ensure_integer_or_infinity(I) when is_integer(I) ->
     I.
+
 aggregate_listener_status(NodeStatus) ->
     aggregate_listener_status(NodeStatus, 0, 0, undefined).
 
@@ -330,11 +334,19 @@ aggregate_listener_status(
     CurrAcc,
     RunningAcc
 ) ->
+    NMaxAcc = plus_max_connections(MaxAcc, Max),
     NRunning = aggregate_running(Running, RunningAcc),
-    aggregate_listener_status(T, MaxAcc + Max, Current + CurrAcc, NRunning);
+    aggregate_listener_status(T, NMaxAcc, Current + CurrAcc, NRunning);
 aggregate_listener_status([], MaxAcc, CurrAcc, RunningAcc) ->
     {MaxAcc, CurrAcc, RunningAcc}.
 
+plus_max_connections(_, infinity) ->
+    infinity;
+plus_max_connections(infinity, _) ->
+    infinity;
+plus_max_connections(A, B) when is_integer(A) andalso is_integer(B) ->
+    A + B.
+
 aggregate_running(R, R) -> R;
 aggregate_running(R, undefined) -> R;
 aggregate_running(_, _) -> inconsistent.

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_schema.erl

@@ -266,7 +266,7 @@ common_listener_opts() ->
             )},
         {max_connections,
             sc(
-                integer(),
+                hoconsc:union([pos_integer(), infinity]),
                 #{
                     default => 1024,
                     desc => ?DESC(gateway_common_listener_max_connections)

+ 32 - 0
apps/emqx_gateway/test/emqx_gateway_api_SUITE.erl

@@ -411,6 +411,38 @@ t_listeners_tcp(_) ->
     {404, _} = request(get, "/gateways/stomp/listeners/stomp:tcp:def"),
     ok.
 
+t_listeners_max_conns(_) ->
+    {204, _} = request(put, "/gateways/stomp", #{}),
+    {404, _} = request(get, "/gateways/stomp/listeners"),
+    LisConf = #{
+        name => <<"def">>,
+        type => <<"tcp">>,
+        bind => <<"127.0.0.1:61613">>,
+        max_connections => 1024
+    },
+    {201, _} = request(post, "/gateways/stomp/listeners", LisConf),
+    {200, ConfResp} = request(get, "/gateways/stomp/listeners"),
+    assert_confs([LisConf], ConfResp),
+    {200, ConfResp1} = request(get, "/gateways/stomp/listeners/stomp:tcp:def"),
+    assert_confs(LisConf, ConfResp1),
+
+    LisConf2 = maps:merge(LisConf, #{max_connections => <<"infinity">>}),
+    {200, _} = request(
+        put,
+        "/gateways/stomp/listeners/stomp:tcp:def",
+        LisConf2
+    ),
+
+    {200, ConfResp2} = request(get, "/gateways/stomp/listeners/stomp:tcp:def"),
+    assert_confs(LisConf2, ConfResp2),
+
+    {200, [Listeners]} = request(get, "/gateways/stomp/listeners"),
+    ?assertMatch(#{max_connections := <<"infinity">>}, Listeners),
+
+    {204, _} = request(delete, "/gateways/stomp/listeners/stomp:tcp:def"),
+    {404, _} = request(get, "/gateways/stomp/listeners/stomp:tcp:def"),
+    ok.
+
 t_listeners_authn(_) ->
     GwConf = #{
         name => <<"stomp">>,

+ 3 - 0
changes/ce/feat-10961.en.md

@@ -0,0 +1,3 @@
+Adds support for unlimited max connections for gateway listeners by allowing
+infinity as a valid value for the `max_connections` field in the configuration
+and HTTP API