Przeglądaj źródła

Merge pull request #14201 from zhongwencool/fix-check-gc-check

fix: mismatch the check_gc message
zmstone 1 rok temu
rodzic
commit
9e4db0bf06

+ 1 - 1
Makefile

@@ -11,7 +11,7 @@ include env.sh
 # Dashboard version
 # from https://github.com/emqx/emqx-dashboard5
 export EMQX_DASHBOARD_VERSION ?= v1.10.1-1
-export EMQX_EE_DASHBOARD_VERSION ?= e1.8.2-beta.3
+export EMQX_EE_DASHBOARD_VERSION ?= e1.8.2
 
 export EMQX_RELUP ?= true
 export EMQX_REL_FORM ?= tgz

+ 2 - 2
apps/emqx/include/emqx_release.hrl

@@ -32,7 +32,7 @@
 %% `apps/emqx/src/bpapi/README.md'
 
 %% Opensource edition
--define(EMQX_RELEASE_CE, "5.8.1").
+-define(EMQX_RELEASE_CE, "5.8.2-beta.1").
 
 %% Enterprise edition
--define(EMQX_RELEASE_EE, "5.8.1").
+-define(EMQX_RELEASE_EE, "5.8.2-beta.1").

+ 6 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -1243,7 +1243,12 @@ do_enqueue_batch(IsReplay, Session, ClientInfo, StreamKey, Srs0, ItBegin, FetchR
             SchedS = emqx_persistent_session_ds_stream_scheduler:on_enqueue(
                 IsReplay, StreamKey, Srs, S0, SchedS0
             ),
-            {ok, Srs, Session#{stream_scheduler_s := SchedS}};
+            %% FIXME: temporary workaround. Schedule stream renewal
+            %% after encountering end of stream. In the future this
+            %% should be done by the scheduler, immediately:
+            Interval = 1,
+            {ok, Srs,
+                ensure_timer(?TIMER_SHARED_SUB, Interval, Session#{stream_scheduler_s := SchedS})};
         {ok, ItEnd, Messages} ->
             {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
                 IsReplay,

+ 33 - 16
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_bookkeeper.erl

@@ -41,8 +41,20 @@
 %% call/cast/info events
 -record(tally_subs, {}).
 -record(tally_disconnected_sessions, {}).
--record(get_subscription_count, {}).
--record(get_disconnected_session_count, {}).
+
+%%------------------------------------------------------------------------------
+%% Stat records & table
+%%------------------------------------------------------------------------------
+
+-record(stat_field, {name, value}).
+
+-define(tab, ?MODULE).
+-define(subs_count, subs_count).
+-define(disconnected_session_count, disconnected_session_count).
+-define(subs_count(VALUE), #stat_field{name = ?subs_count, value = VALUE}).
+-define(disconnected_session_count(VALUE), #stat_field{
+    name = ?disconnected_session_count, value = VALUE
+}).
 
 %%------------------------------------------------------------------------------
 %% API
@@ -57,7 +69,12 @@ start_link() ->
 get_subscription_count() ->
     case emqx_persistent_message:is_persistence_enabled() of
         true ->
-            gen_server:call(?MODULE, #get_subscription_count{}, infinity);
+            try ets:lookup(?tab, ?subs_count) of
+                [?subs_count(N)] -> N;
+                [] -> 0
+            catch
+                error:badarg -> 0
+            end;
         false ->
             0
     end.
@@ -67,7 +84,12 @@ get_subscription_count() ->
 get_disconnected_session_count() ->
     case emqx_persistent_message:is_persistence_enabled() of
         true ->
-            gen_server:call(?MODULE, #get_disconnected_session_count{}, infinity);
+            try ets:lookup(?tab, ?disconnected_session_count) of
+                [?disconnected_session_count(N)] -> N;
+                [] -> 0
+            catch
+                error:badarg -> 0
+            end;
         false ->
             0
     end.
@@ -80,8 +102,7 @@ init(_Opts) ->
     case emqx_persistent_message:is_persistence_enabled() of
         true ->
             State = #{
-                subs_count => 0,
-                disconnected_session_count => 0
+                tab => ets:new(?tab, [named_table, set, protected, {keypos, #stat_field.name}])
             },
             {ok, State, {continue, #tally_subs{}}};
         false ->
@@ -97,12 +118,6 @@ handle_continue(#tally_disconnected_sessions{}, State0) ->
     ensure_disconnected_sessions_tally_timer(),
     {noreply, State}.
 
-handle_call(#get_subscription_count{}, _From, State) ->
-    #{subs_count := N} = State,
-    {reply, N, State};
-handle_call(#get_disconnected_session_count{}, _From, State) ->
-    #{disconnected_session_count := N} = State,
-    {reply, N, State};
 handle_call(_Call, _From, State) ->
     {reply, {error, bad_call}, State}.
 
@@ -124,13 +139,15 @@ handle_info(_Info, State) ->
 %% Internal fns
 %%------------------------------------------------------------------------------
 
-tally_persistent_subscriptions(State0) ->
+tally_persistent_subscriptions(#{tab := Tab} = State) ->
     N = emqx_persistent_session_ds_state:total_subscription_count(),
-    State0#{subs_count := N}.
+    _ = ets:insert(Tab, ?subs_count(N)),
+    State.
 
-tally_disconnected_persistent_sessions(State0) ->
+tally_disconnected_persistent_sessions(#{tab := Tab} = State) ->
     N = do_tally_disconnected_persistent_sessions(),
-    State0#{disconnected_session_count := N}.
+    _ = ets:insert(Tab, ?disconnected_session_count(N)),
+    State.
 
 ensure_subs_tally_timer() ->
     Timeout = emqx_config:get([durable_sessions, subscription_count_refresh_interval]),

+ 3 - 5
apps/emqx/src/emqx_ws_connection.erl

@@ -440,11 +440,9 @@ websocket_handle({Frame, _}, State) ->
 websocket_info({call, From, Req}, State) ->
     handle_call(From, Req, State);
 websocket_info({cast, rate_limit}, State) ->
-    Stats = #{
-        cnt => emqx_pd:reset_counter(incoming_pubs),
-        oct => emqx_pd:reset_counter(incoming_bytes)
-    },
-    return(postpone({check_gc, Stats}, State));
+    Cnt = emqx_pd:reset_counter(incoming_pubs),
+    Oct = emqx_pd:reset_counter(incoming_bytes),
+    return(postpone({check_gc, Cnt, Oct}, State));
 websocket_info({cast, Msg}, State) ->
     handle_info(Msg, State);
 websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->

+ 3 - 2
apps/emqx/test/emqx_ws_connection_SUITE.erl

@@ -378,8 +378,9 @@ t_websocket_info_rate_limit(_) ->
     {ok, _} = websocket_info({cast, rate_limit}, st()),
     ok = timer:sleep(1),
     receive
-        {check_gc, Stats} ->
-            ?assertEqual(#{cnt => 0, oct => 0}, Stats)
+        {check_gc, Cnt, Oct} ->
+            ?assertEqual(0, Cnt),
+            ?assertEqual(0, Oct)
     after 0 -> error(expect_check_gc)
     end.
 

+ 9 - 4
apps/emqx_audit/src/emqx_audit.erl

@@ -40,8 +40,8 @@
 to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) ->
     #?AUDIT{
         operation_id = <<"">>,
-        operation_type = truncate_large_term(Cmd),
-        args = truncate_large_term(Args),
+        operation_type = atom_to_binary(Cmd),
+        args = Args,
         operation_result = <<"">>,
         failure = <<"">>,
         duration_ms = DurationMs,
@@ -67,7 +67,7 @@ to_audit(#{from := erlang_console, function := F, args := Args}) ->
         http_method = <<"">>,
         http_request = <<"">>,
         duration_ms = 0,
-        args = truncate_large_term({F, Args})
+        args = iolist_to_binary(io_lib:format("~p: ~ts", [F, Args]))
     };
 to_audit(#{from := From} = Log) when is_atom(From) ->
     #{
@@ -95,7 +95,7 @@ to_audit(#{from := From} = Log) when is_atom(From) ->
         %% request detail
         http_status_code = StatusCode,
         http_method = Method,
-        http_request = truncate_large_term(Request),
+        http_request = truncate_http_body(Request),
         duration_ms = DurationMs,
         args = <<"">>
     }.
@@ -246,5 +246,10 @@ log_to_file(Level, Meta, #{module := Module} = Handler) ->
             end
     end.
 
+truncate_http_body(Req = #{body := Body}) ->
+    Req#{body => truncate_large_term(Body)};
+truncate_http_body(Req) ->
+    Req.
+
 truncate_large_term(Req) ->
     unicode:characters_to_binary(io_lib:format("~0p", [Req], [{chars_limit, ?CHARS_LIMIT_IN_DB}])).

+ 8 - 3
apps/emqx_audit/test/emqx_audit_api_SUITE.erl

@@ -94,7 +94,12 @@ t_http_api(_) ->
                     <<"operation_id">> := <<"/configs/global_zone">>,
                     <<"source_ip">> := <<"127.0.0.1">>,
                     <<"source">> := _,
-                    <<"http_request">> := _,
+                    <<"http_request">> := #{
+                        <<"method">> := <<"put">>,
+                        <<"body">> := _,
+                        <<"bindings">> := _,
+                        <<"headers">> := #{<<"authorization">> := <<"******">>}
+                    },
                     <<"http_status_code">> := 200,
                     <<"operation_result">> := <<"success">>,
                     <<"operation_type">> := <<"configs">>
@@ -161,7 +166,7 @@ t_cli(_Config) ->
                 <<"operation_id">> := <<"">>,
                 <<"source_ip">> := <<"">>,
                 <<"operation_type">> := <<"conf">>,
-                <<"args">> := <<"[<<\"show\">>,<<\"log\">>]">>,
+                <<"args">> := [<<"show">>, <<"log">>],
                 <<"node">> := _,
                 <<"source">> := <<"">>,
                 <<"http_request">> := <<"">>
@@ -179,7 +184,7 @@ t_cli(_Config) ->
     {ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "from=cli", AuthHeader),
     #{<<"data">> := Data1} = emqx_utils_json:decode(Res1, [return_maps]),
     ?assertMatch(
-        [ShowLogEntry, #{<<"operation_type">> := <<"emqx">>, <<"args">> := <<"[<<\"start\">>]">>}],
+        [ShowLogEntry, #{<<"operation_type">> := <<"emqx">>, <<"args">> := [<<"start">>]}],
         Data1
     ),
     {ok, Res2} = emqx_mgmt_api_test_util:request_api(

+ 3 - 3
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -658,10 +658,10 @@ on_add_channel(
     #{driver := thrift},
     _ChannelId,
     #{
-        resource_opts := #{query_mode := async}
+        resource_opts := #{query_mode := QueryMode, batch_size := BatchSize}
     }
-) ->
-    {error, <<"Thrift does not support async mode">>};
+) when QueryMode =:= async; BatchSize > 1 ->
+    {error, <<"Thrift does not support async or batch mode">>};
 on_add_channel(
     _InstanceId,
     #{driver := thrift, channels := Channels} = OldState,

+ 1 - 0
apps/emqx_cluster_link/src/emqx_cluster_link.erl

@@ -35,6 +35,7 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/trace.hrl").
 
 %%--------------------------------------------------------------------
 %% emqx_external_broker API

+ 30 - 26
apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl

@@ -61,7 +61,7 @@
 -define(ERROR_DELAY, 200).
 
 -define(RECONNECT_TIMEOUT, 5_000).
--define(ACTOR_REINIT_TIMEOUT, 7000).
+-define(ACTOR_INIT_TIMEOUT, 7000).
 
 -define(CLIENT_SUFFIX, ":routesync:").
 -define(PS_CLIENT_SUFFIX, ":routesync-ps:").
@@ -402,16 +402,16 @@ handle_info(
     end;
 handle_info({timeout, TRef, reconnect}, St = #st{reconnect_timer = TRef}) ->
     {noreply, process_connect(St#st{reconnect_timer = undefined})};
-handle_info({timeout, TRef, actor_reinit}, St = #st{actor_init_timer = TRef}) ->
-    ?SLOG(error, #{
-        msg => "remote_actor_init_timeout",
-        target_cluster => St#st.target,
-        actor => St#st.actor
+handle_info({timeout, TRef, actor_init_timeout}, St0 = #st{actor_init_timer = TRef}) ->
+    ?tp(error, "remote_actor_init_timeout", #{
+        target_cluster => St0#st.target,
+        actor => St0#st.actor
     }),
     Reason = init_timeout,
-    _ = maybe_alarm(Reason, St),
-    {noreply,
-        init_remote_actor(St#st{actor_init_timer = undefined, status = disconnected, error = Reason})};
+    _ = maybe_alarm(Reason, St0),
+    St1 = St0#st{actor_init_timer = undefined, status = disconnected, error = Reason},
+    St = stop_link_client(St1),
+    {noreply, ensure_reconnect_timer(St)};
 handle_info({timeout, TRef, _Heartbeat}, St = #st{heartbeat_timer = TRef}) ->
     {noreply, process_heartbeat(St#st{heartbeat_timer = undefined})};
 %% Stale timeout.
@@ -436,7 +436,7 @@ process_connect(St = #st{target = TargetCluster, actor = Actor, link_conf = Conf
     end.
 
 init_remote_actor(
-    St = #st{target = TargetCluster, client = ClientPid, actor = Actor, incarnation = Incr}
+    St0 = #st{target = TargetCluster, client = ClientPid, actor = Actor, incarnation = Incr}
 ) ->
     ReqId = emqx_utils_conv:bin(emqx_utils:gen_id(16)),
     %% TODO: handle subscribe errors
@@ -447,22 +447,20 @@ init_remote_actor(
         ),
         ClientPid
     ),
-    St1 =
-        case Res of
-            ok ->
-                St#st{status = connecting};
-            {error, Reason} ->
-                ?SLOG(error, #{
-                    msg => "cluster_link_init_failed",
-                    reason => Reason,
-                    target_cluster => TargetCluster,
-                    actor => Actor
-                }),
-                _ = maybe_alarm(Reason, St),
-                St#st{error = Reason, status = disconnected}
-        end,
-    TRef = erlang:start_timer(?ACTOR_REINIT_TIMEOUT, self(), actor_reinit),
-    St1#st{actor_init_req_id = ReqId, actor_init_timer = TRef}.
+    case Res of
+        ok ->
+            TRef = erlang:start_timer(?ACTOR_INIT_TIMEOUT, self(), actor_init_timeout),
+            St0#st{status = connecting, actor_init_req_id = ReqId, actor_init_timer = TRef};
+        {error, Reason} ->
+            ?tp(error, "cluster_link_init_failed", #{
+                reason => Reason,
+                target_cluster => TargetCluster,
+                actor => Actor
+            }),
+            _ = maybe_alarm(Reason, St0),
+            St = stop_link_client(St0#st{error = Reason, status = disconnected}),
+            ensure_reconnect_timer(St)
+    end.
 
 post_actor_init(
     St = #st{client = ClientPid, target = TargetCluster, actor = Actor, incarnation = Incr},
@@ -505,6 +503,12 @@ handle_client_down(
     NSt = cancel_heartbeat(St),
     process_connect(NSt#st{client = undefined, error = Reason, status = connecting}).
 
+stop_link_client(#st{client = ClientPid} = St0) ->
+    ?tp("clink_stop_link_client", #{}),
+    ok = emqtt:stop(ClientPid),
+    flush_link_signal(ClientPid),
+    St0#st{client = undefined}.
+
 process_bootstrap(St = #st{bootstrapped = false}, _NeedBootstrap) ->
     run_bootstrap(St);
 process_bootstrap(St = #st{bootstrapped = true}, NeedBootstrap) ->

+ 110 - 5
apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl

@@ -12,6 +12,8 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 -define(ON(NODE, DO), erpc:call(NODE, fun() -> DO end)).
 
 %%
@@ -63,8 +65,13 @@ mk_source_cluster(BaseName, Config) ->
         "\n     topics = []"
         "\n   }"
         "\n ]}",
-    SourceApps1 = [{emqx_conf, combine([conf_log(), SourceConf])}],
-    SourceApps2 = [{emqx_conf, combine([conf_log(), conf_mqtt_listener(41883), SourceConf])}],
+    ExtraApps = proplists:get_value(extra_apps, Config, []),
+    SourceApps1 = [{emqx_conf, combine([conf_log(), SourceConf])}, emqx | ExtraApps],
+    SourceApps2 = [
+        {emqx_conf, combine([conf_log(), SourceConf])},
+        {emqx, conf_mqtt_listener(41883)}
+        | ExtraApps
+    ],
     emqx_cth_cluster:mk_nodespecs(
         [
             {mk_nodename(BaseName, s1), #{apps => SourceApps1}},
@@ -85,7 +92,10 @@ mk_target_cluster(BaseName, Config) ->
         "\n     topics = [\"#\"]"
         "\n   }"
         "\n ]}",
-    TargetApps1 = [{emqx_conf, combine([conf_log(), conf_mqtt_listener(31883), TargetConf])}],
+    TargetApps1 = [
+        {emqx_conf, combine([conf_log(), TargetConf])},
+        {emqx, conf_mqtt_listener(31883)}
+    ],
     TargetApps2 = [{emqx_conf, combine([conf_log(), TargetConf])}],
     emqx_cth_cluster:mk_nodespecs(
         [
@@ -110,13 +120,13 @@ combine([Entry | Rest]) ->
     lists:foldl(fun emqx_cth_suite:merge_config/2, Entry, Rest).
 
 start_cluster_link(Nodes, Config) ->
-    [{ok, Apps}] = lists:usort(
+    Results = lists:usort(
         erpc:multicall(Nodes, emqx_cth_suite, start_apps, [
             [emqx_cluster_link],
             #{work_dir => emqx_cth_suite:work_dir(Config)}
         ])
     ),
-    Apps.
+    lists:flatmap(fun({ok, Apps}) -> Apps end, Results).
 
 stop_cluster_link(Config) ->
     Apps = ?config(tc_apps, Config),
@@ -315,6 +325,101 @@ t_disconnect_on_errors(Config) ->
     ok = emqtt:stop(SC1),
     ok.
 
+%% Checks that if a timeout occurs during actor state initialization, we close the
+%% (potentially unhealthy) connection and start anew.
+t_restart_connection_on_actor_init_timeout('init', Config0) ->
+    ExtraApps = [{emqx_auth, "authorization.no_match = deny"}],
+    SourceNodesSpec = mk_source_cluster(?FUNCTION_NAME, [{extra_apps, ExtraApps} | Config0]),
+    TargetNodesSpec = mk_target_cluster(?FUNCTION_NAME, Config0),
+    ok = snabbkaffe:start_trace(),
+    [
+        {source_nodes_spec, SourceNodesSpec},
+        {target_nodes_spec, TargetNodesSpec}
+        | Config0
+    ];
+t_restart_connection_on_actor_init_timeout('end', _Config) ->
+    ok = snabbkaffe:stop(),
+    emqx_common_test_helpers:call_janitor(),
+    ok.
+t_restart_connection_on_actor_init_timeout(Config) ->
+    SourceNodesSpec = ?config(source_nodes_spec, Config),
+    TargetNodesSpec = ?config(target_nodes_spec, Config),
+    SourceNodes = [SN | _] = emqx_cth_cluster:start(SourceNodesSpec),
+    on_exit(fun() -> ok = emqx_cth_cluster:stop(SourceNodes) end),
+
+    %% Simulate a poorly configured node that'll reject the actor init ack
+    %% message, making the initialization time out.
+    ok = ?ON(
+        SN,
+        emqx_authz_test_lib:setup_config(
+            #{
+                <<"type">> => <<"file">>,
+                <<"enable">> => true,
+                <<"rules">> =>
+                    <<
+                        "{deny, all, subscribe, [\"#\"]}.\n"
+                        "{allow, all, publish, [\"$LINK/#\", \"#\"]}."
+                    >>
+            },
+            #{}
+        )
+    ),
+    %% For some reason, it's fruitless to try to set this config in the app specs....
+    {ok, _} = ?ON(
+        SN,
+        emqx_conf:update([authorization, no_match], deny, #{override_to => cluster})
+    ),
+
+    TargetNodes = emqx_cth_cluster:start(TargetNodesSpec),
+    on_exit(fun() -> ok = emqx_cth_cluster:stop(TargetNodes) end),
+    ?check_trace(
+        #{timetrap => 30_000},
+        begin
+            ct:pal("starting cluster link"),
+            ?wait_async_action(
+                start_cluster_link(SourceNodes ++ TargetNodes, Config),
+                #{?snk_kind := "remote_actor_init_timeout"}
+            ),
+
+            %% Fix the authorization config, it should reconnect.
+            ct:pal("fixing config"),
+            ?wait_async_action(
+                begin
+                    ok = ?ON(
+                        SN,
+                        emqx_authz_test_lib:setup_config(
+                            #{
+                                <<"type">> => <<"file">>,
+                                <<"enable">> => true,
+                                <<"rules">> => <<"{allow, all}.">>
+                            },
+                            #{}
+                        )
+                    ),
+                    {ok, _} = ?ON(
+                        SN,
+                        emqx_conf:update([authorization, no_match], allow, #{override_to => cluster})
+                    )
+                end,
+                #{?snk_kind := clink_route_bootstrap_complete}
+            ),
+
+            ok
+        end,
+        fun(Trace) ->
+            ?assert(
+                ?strict_causality(
+                    #{?snk_kind := "remote_actor_init_timeout", ?snk_meta := #{node := _N1}},
+                    #{?snk_kind := "clink_stop_link_client", ?snk_meta := #{node := _N2}},
+                    _N1 =:= _N2,
+                    Trace
+                )
+            ),
+            ok
+        end
+    ),
+    ok.
+
 %%
 
 maybe_shared_topic(true = _IsShared, Topic) ->

+ 27 - 13
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -28,6 +28,8 @@
 -include_lib("emqx_utils/include/emqx_utils_api.hrl").
 -include_lib("emqx/include/emqx_durable_session_metadata.hrl").
 
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
 -include("emqx_mgmt.hrl").
 
 %% API
@@ -1069,7 +1071,7 @@ run_filters(Rows, QString0) ->
     FuzzyFilterFn = fuzzy_filter_fun(FuzzyQString),
     lists:filter(
         fun(Row) ->
-            does_row_match_query(Row, QString) andalso
+            does_offline_row_match_query(Row, QString) andalso
                 does_row_match_fuzzy_filter(Row, FuzzyFilterFn)
         end,
         Rows
@@ -1722,45 +1724,57 @@ ms(created_at, X) ->
 %%
 %% These functions are used with clients_v2 API.
 
-does_chan_info_match({ip_address, '=:=', IpAddress}, #{conninfo := #{peername := {IpAddress, _}}}) ->
+does_offline_chan_info_match({ip_address, '=:=', IpAddress}, #{
+    conninfo := #{peername := {IpAddress, _}}
+}) ->
     true;
-does_chan_info_match({conn_state, '=:=', State}, #{conn_state := State}) ->
+%% This matchers match only offline clients, because online clients are listed directly from
+%% channel manager's ETS tables. So we succeed here only if offline conn_state is requested.
+does_offline_chan_info_match({conn_state, '=:=', disconnected}, _) ->
     true;
-does_chan_info_match({clean_start, '=:=', CleanStart}, #{conninfo := #{clean_start := CleanStart}}) ->
+does_offline_chan_info_match({conn_state, '=:=', _}, _) ->
+    false;
+does_offline_chan_info_match({clean_start, '=:=', CleanStart}, #{
+    conninfo := #{clean_start := CleanStart}
+}) ->
     true;
-does_chan_info_match({proto_ver, '=:=', ProtoVer}, #{conninfo := #{proto_ver := ProtoVer}}) ->
+does_offline_chan_info_match({proto_ver, '=:=', ProtoVer}, #{conninfo := #{proto_ver := ProtoVer}}) ->
     true;
-does_chan_info_match({connected_at, '>=', ConnectedAtFrom}, #{
+does_offline_chan_info_match({connected_at, '>=', ConnectedAtFrom}, #{
     conninfo := #{connected_at := ConnectedAt}
 }) when
     ConnectedAt >= ConnectedAtFrom
 ->
     true;
-does_chan_info_match({connected_at, '=<', ConnectedAtTo}, #{
+does_offline_chan_info_match({connected_at, '=<', ConnectedAtTo}, #{
     conninfo := #{connected_at := ConnectedAt}
 }) when
     ConnectedAt =< ConnectedAtTo
 ->
     true;
-does_chan_info_match({created_at, '>=', CreatedAtFrom}, #{session := #{created_at := CreatedAt}}) when
+does_offline_chan_info_match({created_at, '>=', CreatedAtFrom}, #{
+    session := #{created_at := CreatedAt}
+}) when
     CreatedAt >= CreatedAtFrom
 ->
     true;
-does_chan_info_match({created_at, '=<', CreatedAtTo}, #{session := #{created_at := CreatedAt}}) when
+does_offline_chan_info_match({created_at, '=<', CreatedAtTo}, #{
+    session := #{created_at := CreatedAt}
+}) when
     CreatedAt =< CreatedAtTo
 ->
     true;
-does_chan_info_match(_, _) ->
+does_offline_chan_info_match(_, _) ->
     false.
 
-does_row_match_query(
+does_offline_row_match_query(
     {_Id, #{metadata := #{offline_info := #{chan_info := ChanInfo}}}}, CompiledQueryString
 ) ->
     lists:all(
-        fun(FieldQuery) -> does_chan_info_match(FieldQuery, ChanInfo) end,
+        fun(FieldQuery) -> does_offline_chan_info_match(FieldQuery, ChanInfo) end,
         CompiledQueryString
     );
-does_row_match_query(_, _) ->
+does_offline_row_match_query(_, _) ->
     false.
 
 %%--------------------------------------------------------------------

+ 15 - 0
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -1953,6 +1953,21 @@ t_list_clients_v2_regular_filters(Config) ->
                 Res2
             ),
 
+            QueryParams3 = [
+                {"limit", "100"},
+                {"conn_state", "connected"}
+            ],
+            Res3 = list_all_v2(QueryParams3, Config),
+            ?assertMatch(
+                [
+                    #{
+                        <<"data">> := [],
+                        <<"meta">> := #{<<"count">> := 0}
+                    }
+                ],
+                Res3
+            ),
+
             ?tp(warning, destroy_session, #{clientid => ClientId1}),
             ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId1])
         end,

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer_cli.erl

@@ -125,7 +125,7 @@ count() ->
 
 topic(Start, Len) ->
     count(),
-    {ok, _HasNext, Messages} = emqx_retainer:page_read(<<"#">>, Start, Len),
+    {ok, _HasNext, Messages} = emqx_retainer:page_read(undefined, Start, Len),
     [?PRINT("~ts~n", [emqx_message:topic(M)]) || M <- Messages],
     ok.
 

+ 14 - 1
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -310,7 +310,7 @@ page_read(_State, Topic, Deadline, Page, Limit) ->
     S0 =
         case Topic of
             undefined ->
-                msg_stream(search_stream(undefined, ['#'], Deadline));
+                msg_stream(all_stream(Deadline));
             _ ->
                 Tokens = topic_to_tokens(Topic),
                 msg_stream(search_stream(Tokens, Deadline))
@@ -411,6 +411,15 @@ search_stream(Tokens, Now) ->
     Index = emqx_retainer_index:select_index(Tokens, Indices),
     search_stream(Index, Tokens, Now).
 
+all_stream(Now) ->
+    Ms = make_message_match_spec(Now),
+    emqx_utils_stream:ets(
+        fun
+            (undefined) -> ets:select(?TAB_MESSAGE, Ms, 1);
+            (Cont) -> ets:select(Cont)
+        end
+    ).
+
 search_stream(undefined, Tokens, Now) ->
     Ms = make_message_match_spec(Tokens, Now),
     MsgStream = emqx_utils_stream:ets(
@@ -504,6 +513,10 @@ read_messages(Topic) ->
             end
     end.
 
+make_message_match_spec(NowMs) ->
+    MsHd = #retained_message{topic = '_', msg = '_', expiry_time = '$3'},
+    [{MsHd, [{'orelse', {'=:=', '$3', 0}, {'>', '$3', NowMs}}], ['$_']}].
+
 make_message_match_spec(Tokens, NowMs) ->
     Cond = emqx_retainer_index:condition(Tokens),
     MsHd = #retained_message{topic = Cond, msg = '_', expiry_time = '$3'},

+ 17 - 0
apps/emqx_retainer/test/emqx_retainer_api_SUITE.erl

@@ -424,6 +424,23 @@ t_match_and_clean(Config) ->
     {ok, LookupJson2} = request_api(get, API),
     ?assertMatch(#{data := []}, decode_json(LookupJson2)).
 
+%% Checks that we can see `$SYS' messages in the API.
+%% https://emqx.atlassian.net/browse/EMQX-13399
+t_retained_sys_messages(_Config) ->
+    Msg0 = emqx_message:make(emqx_sys, <<"$SYS/brokers">>, atom_to_binary(node())),
+    Msg = emqx_message:set_flags(#{sys => true, retain => true}, Msg0),
+    _ = emqx:publish(Msg),
+    API = api_path(["mqtt", "retainer", "messages"]),
+    {ok, LookupJson} = request_api(get, API, "", auth_header_()),
+    ?assertMatch(
+        #{
+            data := [_ | _],
+            meta := #{count := N}
+        } when N > 0,
+        decode_json(LookupJson)
+    ),
+    ok.
+
 %%--------------------------------------------------------------------
 %% Internal funcs
 %%--------------------------------------------------------------------

+ 6 - 0
build

@@ -57,6 +57,12 @@ export PKG_VSN
 
 SYSTEM="$(./scripts/get-distro.sh)"
 
+if [[ $SYSTEM == "el7" ]];
+then
+    echo "WARNING: NO SECURITY UPDATES for CentOS 7 QUIC transport"
+    export QUICER_TLS_VER=openssl
+fi
+
 ARCH="$(uname -m)"
 case "$ARCH" in
     x86_64)

+ 2 - 0
changes/ce/feat-13984.en.md

@@ -1 +1,3 @@
 quicer NIF lib now links to sys libcrypto.
+
+NOTE: this change does not apply to RHEL 7/CentOS 7 because they are still using openssl 1.0.x.

+ 1 - 0
changes/ce/fix-14151.en.md

@@ -0,0 +1 @@
+In `/clients_v2` API, correctly handle `conn_state` filtering selector for offline clients with durable sessions. Previously, offline clients with durable sessions could be selected with `conn_state=connected` selector value.

+ 1 - 0
changes/ce/fix-14184.en.md

@@ -0,0 +1 @@
+Fix the display of total and live connection counts in the dashboard. Previously, the counts could be zero and slowly updated if there were a lot of persistent sessions.

+ 1 - 0
changes/ce/fix-14201.en.md

@@ -0,0 +1 @@
+Stop displaying a check_gc warning when the WebSocket connection encounters a rate limit.

changes/ee/perf-14077.en.md → changes/ee/perf-14152.en.md


+ 2 - 2
deploy/charts/emqx-enterprise/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.8.1
+version: 5.8.2-beta.1
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.8.1
+appVersion: 5.8.2-beta.1

+ 2 - 2
deploy/charts/emqx/Chart.yaml

@@ -14,8 +14,8 @@ type: application
 
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
-version: 5.8.1
+version: 5.8.2-beta.1
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application.
-appVersion: 5.8.1
+appVersion: 5.8.2-beta.1