Просмотр исходного кода

fix(live_conn): fix live connection count on race condition (5.0)

Port from #6406 to 5.0.

When multiple clients try to connect concurrently using the same
client ID, they all call `emqx_channel:ensure_connected`, increasing
the live connection count, but only one will successfully acquire the
lock for that client ID.  This means that all other clients that
increased the live connection count will not get to call neither
`emqx_channel:ensure_disconnected` nor be monitored for `DOWN`
messages, effectively causing a count leak.

By moving the increment to `emqx_cm:register_channel`, which is only
called inside the lock, we can remove this leakage.

Also, during the handling of `DOWN` messages, we now iterate over all
channel PIDs returned by `eqmx_misc:drain_down`, since it could be
that one or more PIDs are not contained in the `pmon` state.
Thales Macedo Garitezi 4 лет назад
Родитель
Сommit
2b5fe9179e
3 измененных файлов с 77 добавлено и 7 удалено
  1. 0 2
      apps/emqx/src/emqx_channel.erl
  2. 2 5
      apps/emqx/src/emqx_cm.erl
  3. 75 0
      apps/emqx/test/emqx_broker_SUITE.erl

+ 0 - 2
apps/emqx/src/emqx_channel.erl

@@ -1641,8 +1641,6 @@ ensure_connected(Channel = #channel{conninfo = ConnInfo,
                                     clientinfo = ClientInfo}) ->
     NConnInfo = ConnInfo#{connected_at => erlang:system_time(millisecond)},
     ok = run_hooks('client.connected', [ClientInfo, NConnInfo]),
-    ChanPid = self(),
-    emqx_cm:mark_channel_connected(ChanPid),
     Channel#channel{conninfo   = NConnInfo,
                     conn_state = connected
                    }.

+ 2 - 5
apps/emqx/src/emqx_cm.erl

@@ -142,6 +142,7 @@ register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid)
     true = ets:insert(?CHAN_TAB, Chan),
     true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}),
     ok = emqx_cm_registry:register_channel(Chan),
+    mark_channel_connected(ChanPid),
     cast({registered, Chan}).
 
 %% @doc Unregister a channel.
@@ -549,11 +550,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}
     ?tp(emqx_cm_process_down, #{pid => Pid, reason => _Reason}),
     ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
     {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
-    lists:foreach(
-      fun({ChanPid, _ClientID}) ->
-              mark_channel_disconnected(ChanPid)
-      end,
-      Items),
+    lists:foreach(fun mark_channel_disconnected/1, ChanPids),
     ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
     {noreply, State#{chan_pmon := PMon1}};
 handle_info(Info, State) ->

+ 75 - 0
apps/emqx/test/emqx_broker_SUITE.erl

@@ -37,6 +37,7 @@ groups() ->
     TCs = emqx_common_test_helpers:all(?MODULE),
     ConnClientTCs = [ t_connected_client_count_persistent
                     , t_connected_client_count_anonymous
+                    , t_connected_client_count_transient_takeover
                     , t_connected_client_stats
                     ],
     OtherTCs = TCs -- ConnClientTCs,
@@ -461,6 +462,80 @@ t_connected_client_count_anonymous({'end', _Config}) ->
     snabbkaffe:stop(),
     ok.
 
+t_connected_client_count_transient_takeover({init, Config}) ->
+    ok = snabbkaffe:start_trace(),
+    process_flag(trap_exit, true),
+    Config;
+t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
+    ConnFun = ?config(conn_fun, Config),
+    ClientID = <<"clientid">>,
+    ?assertEqual(0, emqx_cm:get_connected_client_count()),
+    %% we spawn several clients simultaneously to cause the race
+    %% condition for the client id lock
+    NumClients = 20,
+    {ok, {ok, [_, _]}} =
+        wait_for_events(
+          fun() ->
+                  lists:foreach(
+                    fun(_) ->
+                            spawn(
+                              fun() ->
+                                      {ok, ConnPid} =
+                                          emqtt:start_link([ {clean_start, true}
+                                                           , {clientid, ClientID}
+                                                           | Config]),
+                                      %% don't assert the result: most of them fail
+                                      %% during the race
+                                      emqtt:ConnFun(ConnPid),
+                                      ok
+                              end),
+                            ok
+                    end,
+                    lists:seq(1, NumClients))
+          end,
+          %% there can be only one channel that wins the race for the
+          %% lock for this client id.  we also expect a decrement
+          %% event because the client dies along with the ephemeral
+          %% process.
+          [ emqx_cm_connected_client_count_inc
+          , emqx_cm_connected_client_count_dec
+          ],
+          1000),
+    %% Since more than one pair of inc/dec may be emitted, we need to
+    %% wait for full stabilization
+    timer:sleep(100),
+    %% It must be 0 again because we spawn-linked the clients in
+    %% ephemeral processes above, and all should be dead now.
+    ?assertEqual(0, emqx_cm:get_connected_client_count()),
+    %% connecting again
+    {ok, ConnPid1} = emqtt:start_link([ {clean_start, true}
+                                      , {clientid, ClientID}
+                                      | Config
+                                      ]),
+    {{ok, _}, {ok, [_]}} =
+        wait_for_events(
+          fun() -> emqtt:ConnFun(ConnPid1) end,
+          [emqx_cm_connected_client_count_inc]
+         ),
+    ?assertEqual(1, emqx_cm:get_connected_client_count()),
+    %% abnormal exit of channel process
+    [ChanPid] = emqx_cm:all_channels(),
+    {ok, {ok, [_, _]}} =
+        wait_for_events(
+          fun() ->
+                  exit(ChanPid, kill),
+                  ok
+          end,
+          [ emqx_cm_connected_client_count_dec
+          , emqx_cm_process_down
+          ]
+         ),
+    ?assertEqual(0, emqx_cm:get_connected_client_count()),
+    ok;
+t_connected_client_count_transient_takeover({'end', _Config}) ->
+    snabbkaffe:stop(),
+    ok.
+
 t_connected_client_stats({init, Config}) ->
     ok = supervisor:terminate_child(emqx_kernel_sup, emqx_stats),
     {ok, _} = supervisor:restart_child(emqx_kernel_sup, emqx_stats),