Преглед изворни кода

Merge remote-tracking branch 'origin/release-58' into 20241202-r584-sync-r58

Thales Macedo Garitezi пре 1 година
родитељ
комит
50d18cf0e2
52 измењених фајлова са 875 додато и 206 уклоњено
  1. 6 0
      .ci/docker-compose-file/toxiproxy.json
  2. 2 2
      apps/emqx/rebar.config
  3. 2 8
      apps/emqx/rebar.config.script
  4. 4 1
      apps/emqx/src/config/emqx_config_logger.erl
  5. 49 15
      apps/emqx/src/emqx_channel.erl
  6. 66 1
      apps/emqx/src/emqx_cm.erl
  7. 15 0
      apps/emqx/src/emqx_external_trace.erl
  8. 1 1
      apps/emqx/test/emqx_common_test_helpers.erl
  9. 2 3
      apps/emqx/test/emqx_listeners_SUITE.erl
  10. 1 1
      apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl
  11. 1 1
      apps/emqx/test/emqx_persistent_session_SUITE.erl
  12. 2 2
      apps/emqx/test/emqx_takeover_SUITE.erl
  13. 1 1
      apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl
  14. 1 1
      apps/emqx_bridge/src/emqx_bridge.app.src
  15. 11 2
      apps/emqx_bridge/src/emqx_bridge_v2.erl
  16. 10 0
      apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl
  17. 1 18
      apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl
  18. 1 0
      apps/emqx_bridge_mongodb/docker-ct
  19. 1 1
      apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src
  20. 7 2
      apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl
  21. 148 13
      apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl
  22. 62 4
      apps/emqx_conf/src/emqx_conf_schema.erl
  23. 31 1
      apps/emqx_conf/test/emqx_conf_schema_tests.erl
  24. 46 6
      apps/emqx_ds_builtin_raft/src/emqx_ds_builtin_raft_db_sup.erl
  25. 30 22
      apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl
  26. 17 5
      apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl
  27. 1 1
      apps/emqx_enterprise/src/emqx_enterprise.app.src
  28. 1 5
      apps/emqx_enterprise/src/emqx_enterprise_schema.erl
  29. 2 2
      apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl
  30. 1 1
      apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl
  31. 1 1
      apps/emqx_mysql/mix.exs
  32. 1 1
      apps/emqx_mysql/rebar.config
  33. 2 2
      apps/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl
  34. 4 0
      apps/emqx_opentelemetry/include/emqx_otel_trace.hrl
  35. 70 0
      apps/emqx_opentelemetry/src/emqx_otel_trace.erl
  36. 9 3
      apps/emqx_opentelemetry/src/sampler/emqx_otel_sampler.erl
  37. 27 21
      apps/emqx_resource/src/emqx_resource.erl
  38. 13 0
      apps/emqx_resource/src/emqx_resource_manager.erl
  39. 6 4
      apps/emqx_resource/src/emqx_resource_metrics.erl
  40. 3 4
      apps/emqx_resource/src/emqx_resource_pool.erl
  41. 1 1
      apps/emqx_retainer/rebar.config
  42. 82 22
      apps/emqx_utils/src/emqx_metrics_worker.erl
  43. 103 0
      apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl
  44. 11 0
      changes/ce/fix-14226.en.md
  45. 1 0
      changes/ce/fix-14266.en.md
  46. 8 0
      changes/ce/fix-14289.en.md
  47. 1 0
      changes/ce/fix-14296.en.md
  48. 1 0
      changes/ee/fix-14294.md
  49. 1 0
      changes/ee/fix-14298.en.md
  50. 4 11
      mix.exs
  51. 2 2
      rebar.config
  52. 1 14
      rebar.config.erl

+ 6 - 0
.ci/docker-compose-file/toxiproxy.json

@@ -251,5 +251,11 @@
     "listen": "0.0.0.0:8362",
     "upstream": "datalayers_tls:8362",
     "enabled": true
+  },
+  {
+    "name": "mongo_single_tcp",
+    "listen": "0.0.0.0:27017",
+    "upstream": "mongo:27017",
+    "enabled": true
   }
 ]

+ 2 - 2
apps/emqx/rebar.config

@@ -46,7 +46,7 @@
             {meck, "0.9.2"},
             {proper, "1.4.0"},
             {bbmustache, "1.10.0"},
-            {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.0"}}}
+            {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.5"}}}
         ]},
         {extra_src_dirs, [
             {"test", [recursive]},
@@ -58,7 +58,7 @@
             {meck, "0.9.2"},
             {proper, "1.4.0"},
             {bbmustache, "1.10.0"},
-            {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.0"}}}
+            {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.5"}}}
         ]},
         {extra_src_dirs, [{"test", [recursive]}]}
     ]}

+ 2 - 8
apps/emqx/rebar.config.script

@@ -12,20 +12,14 @@ IsWin32 = fun() ->
     win32 =:= element(1, os:type())
 end,
 
-IsMacOS = fun() ->
-    {unix, darwin} =:= os:type()
-end,
-
 IsQuicSupp = fun() ->
     not (IsCentos6() orelse IsWin32() orelse
-        IsMacOS() orelse
-        false =/= os:getenv("BUILD_WITHOUT_QUIC")) orelse
-        "1" == os:getenv("BUILD_WITH_QUIC")
+        "1" =:= os:getenv("BUILD_WITHOUT_QUIC"))
 end,
 
 Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}},
 Quicer =
-    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.1.9"}}}.
+    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.1.10"}}}.
 
 Dialyzer = fun(Config) ->
     {dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config),

+ 4 - 1
apps/emqx/src/config/emqx_config_logger.erl

@@ -169,7 +169,7 @@ tr_file_handler({HandlerName, SubConf}) ->
         level => conf_get("level", SubConf),
         config => HandlerConf#{
             type => Type,
-            file => FilePath,
+            file => emqx_schema:naive_env_interpolation(FilePath),
             max_no_files => RotationCount,
             max_no_bytes => RotationSize
         },
@@ -187,6 +187,9 @@ logger_file_handlers(Conf) ->
     logger_handlers(Handlers).
 
 logger_handlers(Handlers) ->
+    keep_only_enabeld(Handlers).
+
+keep_only_enabeld(Handlers) ->
     lists:filter(
         fun({_Name, Handler}) ->
             conf_get("enable", Handler, false)

+ 49 - 15
apps/emqx/src/emqx_channel.erl

@@ -211,6 +211,8 @@ info(clientid, #channel{clientinfo = ClientInfo}) ->
     maps:get(clientid, ClientInfo, undefined);
 info(username, #channel{clientinfo = ClientInfo}) ->
     maps:get(username, ClientInfo, undefined);
+info(is_bridge, #channel{clientinfo = ClientInfo}) ->
+    maps:get(is_bridge, ClientInfo, undefined);
 info(session, #channel{session = Session}) ->
     maybe_apply(fun emqx_session:info/1, Session);
 info({session, Info}, #channel{session = Session}) ->
@@ -511,8 +513,11 @@ handle_in(
         client_disconnect,
         Packet,
         (basic_trace_attrs(Channel))#{
-            'client.peername' => emqx_utils:ntoa(info(peername, Channel)),
+            'client.proto_name' => info(proto_name, Channel),
+            'client.proto_ver' => info(proto_ver, Channel),
+            'client.is_bridge' => info(is_bridge, Channel),
             'client.sockname' => emqx_utils:ntoa(info(sockname, Channel)),
+            'client.peername' => emqx_utils:ntoa(info(peername, Channel)),
             'client.disconnect.reason_code' => emqx_packet:info(reason_code, _PktVar)
         },
         fun(PacketWithTrace) -> process_disconnect(PacketWithTrace, Channel) end
@@ -1498,12 +1503,26 @@ handle_call(Req, Channel) ->
     ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
 
 handle_info({subscribe, TopicFilters}, Channel) ->
-    NTopicFilters = enrich_subscribe(TopicFilters, Channel),
-    {_TopicFiltersWithRC, NChannel} = post_process_subscribe(NTopicFilters, Channel),
-    {ok, NChannel};
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        broker_subscribe,
+        [],
+        maps:merge(basic_trace_attrs(Channel), topic_filters_attrs({subscribe, TopicFilters})),
+        fun([]) ->
+            NTopicFilters = enrich_subscribe(TopicFilters, Channel),
+            {_TopicFiltersWithRC, NChannel} = post_process_subscribe(NTopicFilters, Channel),
+            {ok, NChannel}
+        end
+    );
 handle_info({unsubscribe, TopicFilters}, Channel) ->
-    {_RC, NChannel} = post_process_unsubscribe(TopicFilters, #{}, Channel),
-    {ok, NChannel};
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        broker_unsubscribe,
+        [],
+        maps:merge(basic_trace_attrs(Channel), topic_filters_attrs({unsubscribe, TopicFilters})),
+        fun([]) ->
+            {_RC, NChannel} = post_process_unsubscribe(TopicFilters, #{}, Channel),
+            {ok, NChannel}
+        end
+    );
 handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
     shutdown(Reason, Channel);
 handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
@@ -3175,23 +3194,38 @@ basic_trace_attrs(Channel) ->
     }.
 
 topic_filters_attrs(?PACKET(?SUBSCRIBE, PktVar)) ->
-    {TFs, SubOpts} = lists:foldl(
-        fun({Topic, SubOpts}, {AccTFs, AccSubOpts}) ->
-            {[emqx_topic:maybe_format_share(Topic) | AccTFs], [SubOpts | AccSubOpts]}
-        end,
-        {[], []},
-        emqx_packet:info(topic_filters, PktVar)
-    ),
+    {TFs, SubOpts} = do_topic_filters_attrs(subscribe, emqx_packet:info(topic_filters, PktVar)),
     #{
         'client.subscribe.topics' => emqx_utils_json:encode(lists:reverse(TFs)),
         'client.subscribe.sub_opts' => emqx_utils_json:encode(lists:reverse(SubOpts))
     };
 topic_filters_attrs(?PACKET(?UNSUBSCRIBE, PktVar)) ->
+    {TFs, _} = do_topic_filters_attrs(unsubscribe, emqx_packet:info(topic_filters, PktVar)),
+    #{'client.unsubscribe.topics' => emqx_utils_json:encode(TFs)};
+topic_filters_attrs({subscribe, TopicFilters}) ->
+    {TFs, SubOpts} = do_topic_filters_attrs(subscribe, TopicFilters),
+    #{
+        'broker.subscribe.topics' => emqx_utils_json:encode(lists:reverse(TFs)),
+        'broker.subscribe.sub_opts' => emqx_utils_json:encode(lists:reverse(SubOpts))
+    };
+topic_filters_attrs({unsubscribe, TopicFilters}) ->
+    {TFs, _} = do_topic_filters_attrs(unsubscribe, [TF || {TF, _} <- TopicFilters]),
+    #{'broker.unsubscribe.topics' => emqx_utils_json:encode(TFs)}.
+
+do_topic_filters_attrs(subscribe, TopicFilters) ->
+    {_TFs, _SubOpts} = lists:foldl(
+        fun({Topic, SubOpts}, {AccTFs, AccSubOpts}) ->
+            {[emqx_topic:maybe_format_share(Topic) | AccTFs], [SubOpts | AccSubOpts]}
+        end,
+        {[], []},
+        TopicFilters
+    );
+do_topic_filters_attrs(unsubscribe, TopicFilters) ->
     TFs = [
         emqx_topic:maybe_format_share(Name)
-     || Name <- emqx_packet:info(topic_filters, PktVar)
+     || Name <- TopicFilters
     ],
-    #{'client.unsubscribe.topics' => emqx_utils_json:encode(TFs)}.
+    {TFs, undefined}.
 
 authn_attrs({continue, _Properties, _Channel}) ->
     %% TODO

+ 66 - 1
apps/emqx/src/emqx_cm.erl

@@ -22,6 +22,8 @@
 -include("emqx_cm.hrl").
 -include("logger.hrl").
 -include("types.hrl").
+-include("emqx_mqtt.hrl").
+-include("emqx_external_trace.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 -include_lib("stdlib/include/ms_transform.hrl").
@@ -457,6 +459,14 @@ discard_session(ClientId) when is_binary(ClientId) ->
 when
     Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'} | takeover_kick.
 request_stepdown(Action, ConnMod, Pid) ->
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        broker_disconnect,
+        [],
+        maps:merge(basic_trace_attrs(Pid), action_to_reason(Action)),
+        fun([]) -> do_request_stepdown(Action, ConnMod, Pid) end
+    ).
+
+do_request_stepdown(Action, ConnMod, Pid) ->
     Timeout =
         case Action == kick orelse Action == discard of
             true -> ?T_KICK;
@@ -696,7 +706,9 @@ lookup_channels(local, ClientId) ->
     [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
 
 -spec lookup_client(
-    {clientid, emqx_types:clientid()} | {username, emqx_types:username()} | {chan_pid, chan_pid()}
+    {clientid, emqx_types:clientid()}
+    | {username, emqx_types:username()}
+    | {chan_pid, chan_pid()}
 ) ->
     [channel_info()].
 lookup_client({username, Username}) ->
@@ -842,3 +854,56 @@ kick_session_chans(ClientId, ChanPids) ->
             ok
     end,
     lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids).
+
+-if(?EMQX_RELEASE_EDITION == ee).
+
+basic_trace_attrs(Pid) ->
+    %% io:format("lookup_client({chan_pid, Pid}): ~p", [lookup_client({chan_pid, Pid})]),
+    case lookup_client({chan_pid, Pid}) of
+        [] ->
+            #{'channel.pid' => iolist_to_binary(io_lib:format("~p", [Pid]))};
+        [{_Chan, #{clientinfo := ClientInfo, conninfo := ConnInfo}, _Stats}] ->
+            #{
+                'client.clientid' => maps:get(clientid, ClientInfo, undefined),
+                'client.username' => maps:get(username, ClientInfo, undefined),
+                'client.proto_name' => maps:get(proto_name, ConnInfo, undefined),
+                'client.proto_ver' => maps:get(proto_ver, ConnInfo, undefined),
+                'client.is_bridge' => maps:get(is_bridge, ClientInfo, undefined),
+                'client.sockname' => ntoa(maps:get(sockname, ConnInfo, undefined)),
+                'client.peername' => ntoa(maps:get(peername, ConnInfo, undefined))
+            };
+        _ ->
+            #{}
+    end.
+
+action_to_reason(Action) when
+    Action =:= kick orelse
+        Action =:= takeover_kick
+->
+    #{
+        'client.disconnect.reason_code' => ?RC_ADMINISTRATIVE_ACTION,
+        'client.disconnect.reason' => kick
+    };
+action_to_reason(discard) ->
+    #{
+        'client.disconnect.reason_code' => ?RC_SESSION_TAKEN_OVER,
+        'client.disconnect.reason' => discard
+    };
+action_to_reason({takeover, 'begin'}) ->
+    #{
+        'client.disconnect.reason_code' => ?RC_SESSION_TAKEN_OVER,
+        'client.disconnect.reason' => takeover_begin
+    };
+action_to_reason({takeover, 'end'}) ->
+    #{
+        'client.disconnect.reason_code' => ?RC_SESSION_TAKEN_OVER,
+        'disconnect.reason' => takeover_end
+    }.
+
+ntoa(undefined) ->
+    undefined;
+ntoa(IpPort) ->
+    emqx_utils:ntoa(IpPort).
+
+-else.
+-endif.

+ 15 - 0
apps/emqx/src/emqx_external_trace.erl

@@ -56,6 +56,21 @@
     InitAttrs :: attrs(),
     Res :: term().
 
+-callback broker_disconnect(Any, InitAttrs, fun((Any) -> Res)) -> Res when
+    Any :: any(),
+    InitAttrs :: attrs(),
+    Res :: any().
+
+-callback broker_subscribe(Any, InitAttrs, fun((Any) -> Res)) -> Res when
+    Any :: any(),
+    InitAttrs :: attrs(),
+    Res :: any().
+
+-callback broker_unsubscribe(Any, InitAttrs, fun((Any) -> Res)) -> Res when
+    Any :: any(),
+    InitAttrs :: attrs(),
+    Res :: any().
+
 %% Message Processing Spans
 %% PUBLISH(form Publisher) -> ROUTE -> FORWARD(optional) -> DELIVER(to Subscribers)
 -callback client_publish(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when

+ 1 - 1
apps/emqx/test/emqx_common_test_helpers.erl

@@ -1004,7 +1004,7 @@ clear_screen() ->
     end.
 
 with_mock(Mod, FnName, MockedFn, Fun) ->
-    ok = meck:new(Mod, [non_strict, no_link, no_history, passthrough]),
+    ok = meck:new(Mod, [no_link, no_history, passthrough]),
     ok = meck:expect(Mod, FnName, MockedFn),
     try
         Fun()

+ 2 - 3
apps/emqx/test/emqx_listeners_SUITE.erl

@@ -379,9 +379,8 @@ t_wss_update_opts(Config) ->
         ),
 
         %% Unable to connect with old SSL options, server's cert is signed by another CA.
-        %% Due to a bug `emqtt` exits with `badmatch` in this case.
-        ?assertExit(
-            _Badmatch,
+        ?assertError(
+            timeout,
             emqtt_connect_wss(Host, Port, ClientSSLOpts)
         ),
 

+ 1 - 1
apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl

@@ -1039,7 +1039,7 @@ t_share_subscribe_no_local(Config) ->
     %% MQTT-5.0 [MQTT-3.8.3-4] and [MQTT-4.13.1-1] (Disconnect)
     case catch emqtt:subscribe(Client, #{}, [{ShareTopic, [{nl, true}, {qos, 1}]}]) of
         {'EXIT', {Reason, _Stk}} ->
-            ?assertEqual({disconnected, ?RC_PROTOCOL_ERROR, #{}}, Reason)
+            ?assertEqual({shutdown, {disconnected, ?RC_PROTOCOL_ERROR, #{}}}, Reason)
     end,
 
     process_flag(trap_exit, false).

+ 1 - 1
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -368,7 +368,7 @@ t_connect_discards_existing_client(Config) ->
 
     receive
         {'DOWN', MRef, process, Client1, Reason} ->
-            ok = ?assertMatch({disconnected, ?RC_SESSION_TAKEN_OVER, _}, Reason),
+            ok = ?assertMatch({shutdown, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}, Reason),
             ok = emqtt:stop(Client2),
             ok
     after 1000 ->

+ 2 - 2
apps/emqx/test/emqx_takeover_SUITE.erl

@@ -1072,7 +1072,7 @@ filter_payload(List, Payload) when is_binary(Payload) ->
 %% @doc assert emqtt *client* process exits as expected.
 assert_client_exit(Pid, v5, takenover) ->
     %% @ref: MQTT 5.0 spec [MQTT-3.1.4-3]
-    ?assertReceive({'EXIT', Pid, {disconnected, ?RC_SESSION_TAKEN_OVER, _}});
+    ?assertReceive({'EXIT', Pid, {shutdown, {disconnected, ?RC_SESSION_TAKEN_OVER, _}}});
 assert_client_exit(Pid, v3, takenover) ->
     ?assertReceive(
         {'EXIT', Pid, {shutdown, Reason}} when
@@ -1084,7 +1084,7 @@ assert_client_exit(Pid, v3, takenover) ->
 assert_client_exit(Pid, v3, kicked) ->
     ?assertReceive({'EXIT', Pid, _}, 1_000, #{pid => Pid});
 assert_client_exit(Pid, v5, kicked) ->
-    ?assertReceive({'EXIT', Pid, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}});
+    ?assertReceive({'EXIT', Pid, {shutdown, {disconnected, ?RC_ADMINISTRATIVE_ACTION, _}}});
 assert_client_exit(Pid, _, killed) ->
     ?assertReceive({'EXIT', Pid, killed}).
 

+ 1 - 1
apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl

@@ -584,7 +584,7 @@ t_auth_expire(_Config) ->
             {ok, _} = emqtt:connect(C),
             receive
                 {'DOWN', _Ref, process, C, Reason} ->
-                    ?assertMatch({disconnected, ?RC_NOT_AUTHORIZED, _}, Reason)
+                    ?assertMatch({shutdown, {disconnected, ?RC_NOT_AUTHORIZED, _}}, Reason)
             after WaitTime ->
                 error(timeout)
             end

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
-    {vsn, "0.2.6"},
+    {vsn, "0.2.7"},
     {registered, [emqx_bridge_sup]},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 11 - 2
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -524,7 +524,6 @@ uninstall_bridge_v2(
     BridgeV2Id = id_with_root_name(ConfRootKey, BridgeV2Type, BridgeName, ConnectorName),
     CreationOpts = emqx_resource:fetch_creation_opts(Config),
     ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts),
-    ok = emqx_resource:clear_metrics(BridgeV2Id),
     case referenced_connectors_exist(BridgeV2Type, ConnectorName, BridgeName) of
         {error, _} ->
             ok;
@@ -533,7 +532,14 @@ uninstall_bridge_v2(
             ConnectorId = emqx_connector_resource:resource_id(
                 connector_type(BridgeV2Type), ConnectorName
             ),
-            emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id)
+            Res = emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id),
+            case Res of
+                ok ->
+                    ok = emqx_resource:clear_metrics(BridgeV2Id);
+                _ ->
+                    ok
+            end,
+            Res
     end.
 
 combine_connector_and_bridge_v2_config(
@@ -1182,6 +1188,9 @@ post_config_update([ConfRootKey, BridgeType, BridgeName], _Req, NewConf, OldConf
         {error, timeout} ->
             ErrorContext = #{
                 error => uninstall_timeout,
+                bridge_kind => ConfRootKey,
+                type => BridgeType,
+                name => BridgeName,
                 reason => <<
                     "Timed out trying to remove action or source.  Please try again and,"
                     " if the error persists, try disabling the connector before retrying."

+ 10 - 0
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -486,6 +486,16 @@ get_action_api(Config) ->
     ct:pal("get action (http) result:\n  ~p", [Res]),
     Res.
 
+get_action_metrics_api(Config) ->
+    ActionName = ?config(action_name, Config),
+    ActionType = ?config(action_type, Config),
+    ActionId = emqx_bridge_resource:bridge_id(ActionType, ActionName),
+    Path = emqx_mgmt_api_test_util:api_path(["actions", ActionId, "metrics"]),
+    ct:pal("getting action (http)"),
+    Res = request(get, Path, []),
+    ct:pal("get action (http) result:\n  ~p", [Res]),
+    simplify_result(Res).
+
 update_bridge_api(Config) ->
     update_bridge_api(Config, _Overrides = #{}).
 

+ 1 - 18
apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl

@@ -1003,25 +1003,8 @@ t_authentication_error_on_send_message(Config0) ->
         end,
     Config = lists:keyreplace(greptimedb_config, 1, Config0, {greptimedb_config, GreptimeConfig}),
 
-    % Fake initialization to simulate credential update after bridge was created.
-    emqx_common_test_helpers:with_mock(
-        greptimedb,
-        check_auth,
-        fun(_) ->
-            ok
-        end,
-        fun() ->
-            {ok, _} = create_bridge(Config),
-            ResourceId = resource_id(Config0),
-            ?retry(
-                _Sleep = 1_000,
-                _Attempts = 10,
-                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
-            )
-        end
-    ),
+    {ok, _} = create_bridge(Config),
 
-    % Now back to wrong credentials
     ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
     Payload = #{
         int_key => -123,

+ 1 - 0
apps/emqx_bridge_mongodb/docker-ct

@@ -1,2 +1,3 @@
+toxiproxy
 mongo
 mongo_rs_sharded

+ 1 - 1
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_mongodb, [
     {description, "EMQX Enterprise MongoDB Bridge"},
-    {vsn, "0.3.3"},
+    {vsn, "0.3.4"},
     {registered, []},
     {applications, [
         kernel,

+ 7 - 2
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl

@@ -61,8 +61,13 @@ on_get_channel_status(InstanceId, _ChannelId, State) ->
 on_get_channels(InstanceId) ->
     emqx_bridge_v2:get_channels_for_connector(InstanceId).
 
-on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) ->
-    emqx_mongodb:on_get_status(InstanceId, ConnectorState).
+on_get_status(InstanceId, ConnectorState = #{connector_state := DriverState0}) ->
+    case emqx_mongodb:on_get_status(InstanceId, DriverState0) of
+        {Status, DriverState, Reason} ->
+            {Status, ConnectorState#{connector_state := DriverState}, Reason};
+        Status when is_atom(Status) ->
+            Status
+    end.
 
 on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_state := ConnectorState}) ->
     #{

+ 148 - 13
apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl

@@ -19,9 +19,10 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--define(BRIDGE_TYPE, mongodb).
--define(BRIDGE_TYPE_BIN, <<"mongodb">>).
+-define(ACTION_TYPE, mongodb).
+-define(ACTION_TYPE_BIN, <<"mongodb">>).
 -define(CONNECTOR_TYPE, mongodb).
 -define(CONNECTOR_TYPE_BIN, <<"mongodb">>).
 
@@ -36,8 +37,11 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
-    MongoHost = os:getenv("MONGO_SINGLE_HOST", "mongo"),
+    MongoHost = os:getenv("MONGO_SINGLE_HOST", "toxiproxy"),
     MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")),
+    ProxyHost = "toxiproxy",
+    ProxyPort = 8474,
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
     case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
         true ->
             Apps = emqx_cth_suite:start(
@@ -49,14 +53,15 @@ init_per_suite(Config) ->
                     emqx_bridge_mongodb,
                     emqx_rule_engine,
                     emqx_management,
-                    {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+                    emqx_mgmt_api_test_util:emqx_dashboard()
                 ],
                 #{work_dir => emqx_cth_suite:work_dir(Config)}
             ),
-            {ok, Api} = emqx_common_test_http:create_default_app(),
             [
+                {proxy_name, "mongo_single_tcp"},
+                {proxy_host, ProxyHost},
+                {proxy_port, ProxyPort},
                 {apps, Apps},
-                {api, Api},
                 {mongo_host, MongoHost},
                 {mongo_port, MongoPort}
                 | Config
@@ -80,6 +85,9 @@ init_per_testcase(TestCase, Config) ->
 
 common_init_per_testcase(TestCase, Config) ->
     ct:timetrap(timer:seconds(60)),
+    ProxyHost = ?config(proxy_host, Config),
+    ProxyPort = ?config(proxy_port, Config),
+    emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
     emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
     emqx_config:delete_override_conf_files(),
     UniqueNum = integer_to_binary(erlang:unique_integer()),
@@ -97,15 +105,16 @@ common_init_per_testcase(TestCase, Config) ->
         | Config
     ],
     ConnectorConfig = connector_config(Name, NConfig),
-    BridgeConfig = bridge_config(Name, Name),
+    BridgeConfig = action_config(Name, Name),
     ok = snabbkaffe:start_trace(),
     [
+        {bridge_kind, action},
         {connector_type, ?CONNECTOR_TYPE},
         {connector_name, Name},
         {connector_config, ConnectorConfig},
-        {bridge_type, ?BRIDGE_TYPE},
-        {bridge_name, Name},
-        {bridge_config, BridgeConfig}
+        {action_type, ?ACTION_TYPE},
+        {action_name, Name},
+        {action_config, BridgeConfig}
         | NConfig
     ].
 
@@ -164,7 +173,7 @@ parse_and_check_connector_config(InnerConfigMap, Name) ->
     ct:pal("parsed config: ~p", [Config]),
     InnerConfigMap.
 
-bridge_config(Name, ConnectorId) ->
+action_config(Name, ConnectorId) ->
     InnerConfigMap0 =
         #{
             <<"enable">> => true,
@@ -197,7 +206,7 @@ serde_roundtrip(InnerConfigMap0) ->
     InnerConfigMap.
 
 parse_and_check_bridge_config(InnerConfigMap, Name) ->
-    emqx_bridge_v2_testlib:parse_and_check(?BRIDGE_TYPE_BIN, Name, InnerConfigMap).
+    emqx_bridge_v2_testlib:parse_and_check(?ACTION_TYPE_BIN, Name, InnerConfigMap).
 
 shared_secret_path() ->
     os:getenv("CI_SHARED_SECRET_PATH", "/var/lib/secret").
@@ -221,6 +230,39 @@ make_message() ->
         timestamp => Time
     }.
 
+create_bridge_api(Config, Overrides) ->
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:create_bridge_api(Config, Overrides)
+    ).
+
+create_action_api(Config, Overrides) ->
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:create_kind_api(Config, Overrides)
+    ).
+
+get_connector_api(Config) ->
+    ConnectorName = ?config(connector_name, Config),
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:get_connector_api(
+            ?CONNECTOR_TYPE,
+            ConnectorName
+        )
+    ).
+
+get_action_api(Config) ->
+    emqx_bridge_v2_testlib:simplify_result(
+        emqx_bridge_v2_testlib:get_action_api(
+            Config
+        )
+    ).
+
+delete_action_api(Config) ->
+    emqx_bridge_v2_testlib:delete_kind_api(
+        action,
+        ?ACTION_TYPE,
+        ?config(action_name, Config)
+    ).
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -234,7 +276,7 @@ t_create_via_http(Config) ->
     ok.
 
 t_on_get_status(Config) ->
-    emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
+    emqx_bridge_v2_testlib:t_on_get_status(Config),
     ok.
 
 t_sync_query(Config) ->
@@ -245,3 +287,96 @@ t_sync_query(Config) ->
         mongo_bridge_connector_on_query_return
     ),
     ok.
+
+%% Checks that we don't mangle the connector state if the underlying connector
+%% implementation returns a tuple with new state.
+%% See also: https://emqx.atlassian.net/browse/EMQX-13496.
+t_timeout_during_connector_health_check(Config0) ->
+    ProxyName = ?config(proxy_name, Config0),
+    ProxyHost = ?config(proxy_host, Config0),
+    ProxyPort = ?config(proxy_port, Config0),
+    ConnectorName = ?config(connector_name, Config0),
+    Overrides = #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"700ms">>}},
+    Config = emqx_bridge_v2_testlib:proplist_update(
+        Config0,
+        connector_config,
+        fun(Cfg) ->
+            emqx_utils_maps:deep_merge(Cfg, Overrides)
+        end
+    ),
+    ?check_trace(
+        begin
+            {201, _} = create_bridge_api(Config, Overrides),
+
+            %% emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
+            %%   ?retry(500, 10, ?assertEqual(<<"disconnected">>, GetConnectorStatus())),
+            %%   ok
+            %% end),
+            %% Wait until it's disconnected
+            emqx_common_test_helpers:with_mock(
+                emqx_resource_pool,
+                health_check_workers,
+                fun(_Pool, _Fun, _Timeout, _Opts) -> {error, timeout} end,
+                fun() ->
+                    ?retry(
+                        1_000,
+                        10,
+                        ?assertMatch(
+                            {200, #{<<"status">> := <<"disconnected">>}},
+                            get_connector_api(Config)
+                        )
+                    ),
+                    ?retry(
+                        1_000,
+                        10,
+                        ?assertMatch(
+                            {200, #{<<"status">> := <<"disconnected">>}},
+                            get_action_api(Config)
+                        )
+                    ),
+                    ok
+                end
+            ),
+
+            %% Wait for recovery
+            ?retry(
+                1_000,
+                10,
+                ?assertMatch(
+                    {200, #{<<"status">> := <<"connected">>}},
+                    get_connector_api(Config)
+                )
+            ),
+
+            RuleTopic = <<"t/timeout">>,
+            {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
+                ?ACTION_TYPE, RuleTopic, Config
+            ),
+
+            %% Action should be fine, including its metrics.
+            ?retry(
+                1_000,
+                10,
+                ?assertMatch(
+                    {200, #{<<"status">> := <<"connected">>}},
+                    get_action_api(Config)
+                )
+            ),
+            emqx:publish(emqx_message:make(RuleTopic, <<"hey">>)),
+            ?retry(
+                1_000,
+                10,
+                ?assertMatch(
+                    {200, #{<<"metrics">> := #{<<"matched">> := 1}}},
+                    emqx_bridge_v2_testlib:get_action_metrics_api(Config)
+                )
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertEqual([], ?of_kind("health_check_exception", Trace)),
+            ?assertEqual([], ?of_kind("remove_channel_failed", Trace)),
+            ok
+        end
+    ),
+    ok.

+ 62 - 4
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -41,7 +41,13 @@
 -export([tr_prometheus_collectors/1]).
 
 %% internal exports for `emqx_enterprise_schema' only.
--export([ensure_unicode_path/2, convert_rotation/2, log_handler_common_confs/2]).
+-export([
+    log_file_path_converter/2,
+    fix_old_version_abs_log_path/1,
+    ensure_unicode_path/2,
+    convert_rotation/2,
+    log_handler_common_confs/2
+]).
 
 -define(DEFAULT_NODE_NAME, <<"emqx@127.0.0.1">>).
 
@@ -84,6 +90,7 @@
     dropped_msg_due_to_mqueue_is_full,
     external_broker_crashed,
     failed_to_retain_message,
+    handle_resource_metrics_failed,
     socket_receive_paused_by_rate_limit,
     unrecoverable_resource_error
 ]).
@@ -955,9 +962,7 @@ fields("log_file_handler") ->
                     default => <<"${EMQX_LOG_DIR}/emqx.log">>,
                     aliases => [file, to],
                     importance => ?IMPORTANCE_HIGH,
-                    converter => fun(Path, Opts) ->
-                        emqx_schema:naive_env_interpolation(ensure_unicode_path(Path, Opts))
-                    end
+                    converter => fun log_file_path_converter/2
                 }
             )},
         {"rotation_count",
@@ -1519,6 +1524,59 @@ convert_rotation(#{} = Rotation, _Opts) -> maps:get(<<"count">>, Rotation, 10);
 convert_rotation(Count, _Opts) when is_integer(Count) -> Count;
 convert_rotation(Count, _Opts) -> throw({"bad_rotation", Count}).
 
+log_file_path_converter(Path, Opts) ->
+    Fixed = fix_old_version_abs_log_path(Path),
+    ensure_unicode_path(Fixed, Opts).
+
+%% Prior to 5.8.3, the log file paths are resolved by scehma module
+%% and the interpolated paths (absolute paths) are exported.
+%% When exported from docker but import to a non-docker environment,
+%% the absolute paths are not valid anymore.
+%% Here we try to fix the old version absolute paths.
+fix_old_version_abs_log_path(Bin) when is_binary(Bin) ->
+    try
+        List = [_ | _] = unicode:characters_to_list(Bin, utf8),
+        Fixed = fix_old_version_abs_log_path(List),
+        unicode:characters_to_binary(Fixed, utf8)
+    catch
+        _:_ ->
+            %% defer the validation to ensure_unicode_path
+            Bin
+    end;
+fix_old_version_abs_log_path("/opt/emqx/log/" ++ Name) ->
+    maybe_subst_log_dir("/opt/emqx/log", Name);
+fix_old_version_abs_log_path("/var/log/emqx/" ++ Name) ->
+    maybe_subst_log_dir("/var/log/emqx", Name);
+fix_old_version_abs_log_path(Other) ->
+    %% undefined, or other log dir
+    Other.
+
+%% Substitute the log dir with environment variable EMQX_LOG_DIR
+%% when possible
+maybe_subst_log_dir(Dir, Name) ->
+    Env = os:getenv("EMQX_LOG_DIR"),
+    IsEnvSet = (Env =/= false andalso Env =/= ""),
+    case Env =:= Dir of
+        true ->
+            %% the path is the same as the environment variable
+            %% substitute it with the environment variable
+            "${EMQX_LOG_DIR}/" ++ Name;
+        false ->
+            case filelib:is_dir(Dir) of
+                true ->
+                    %% the path exists, keep it
+                    filename:join(Dir, Name);
+                false when IsEnvSet ->
+                    %% the path does not exist, but the environment variable is set
+                    %% substitute it with the environment variable
+                    "${EMQX_LOG_DIR}/" ++ Name;
+                false ->
+                    %% the path does not exist, and the environment variable is not set
+                    %% keep it
+                    filename:join(Dir, Name)
+            end
+    end.
+
 ensure_unicode_path(Path, Opts) ->
     emqx_schema:ensure_unicode_path(Path, Opts).
 

+ 31 - 1
apps/emqx_conf/test/emqx_conf_schema_tests.erl

@@ -509,7 +509,7 @@ log_path_test_() ->
     end,
 
     [
-        {"default-values", fun() -> Assert(default, "log/emqx.log", check(#{})) end},
+        {"default-values", fun() -> Assert(default, "${EMQX_LOG_DIR}/emqx.log", check(#{})) end},
         {"file path with space", fun() -> Assert(name1, "a /b", check(Fh(<<"a /b">>))) end},
         {"windows", fun() -> Assert(name1, "c:\\a\\ b\\", check(Fh(<<"c:\\a\\ b\\">>))) end},
         {"unicoded", fun() -> Assert(name1, "路 径", check(Fh(<<"路 径"/utf8>>))) end},
@@ -714,3 +714,33 @@ node_role_conf(Role0) ->
     Hocon = <<"node { role =", Role/binary, ", cookie = \"cookie\", data_dir = \".\" }">>,
     {ok, ConfMap} = hocon:binary(Hocon, #{format => map}),
     ConfMap.
+
+fix_log_dir_path_test() ->
+    ?assertEqual(
+        "/opt/emqx/log/a.log",
+        emqx_conf_schema:fix_old_version_abs_log_path("/opt/emqx/log/a.log")
+    ),
+    ?assertEqual(
+        "/var/log/emqx/a.log",
+        emqx_conf_schema:fix_old_version_abs_log_path("/var/log/emqx/a.log")
+    ),
+    try
+        os:putenv("EMQX_LOG_DIR", "foobar"),
+        %% assumption: the two hard coded paths below do not exist in CT test runner
+        ?assertEqual(
+            "${EMQX_LOG_DIR}/a.log",
+            emqx_conf_schema:fix_old_version_abs_log_path("/var/log/emqx/a.log")
+        ),
+        ?assertEqual(
+            "${EMQX_LOG_DIR}/a.log",
+            emqx_conf_schema:fix_old_version_abs_log_path("/opt/emqx/log/a.log")
+        ),
+        %% binary in binary out
+        ?assertEqual(
+            <<"${EMQX_LOG_DIR}/a.log">>,
+            emqx_conf_schema:fix_old_version_abs_log_path(<<"/var/log/emqx/a.log">>)
+        )
+    after
+        os:unsetenv("EMQX_LOG_DIR")
+    end,
+    ok.

+ 46 - 6
apps/emqx_ds_builtin_raft/src/emqx_ds_builtin_raft_db_sup.erl

@@ -14,6 +14,7 @@
     start_shard/1,
     start_egress/1,
     stop_shard/1,
+    shard_info/2,
     terminate_storage/1,
     restart_storage/1,
     ensure_shard/1,
@@ -31,23 +32,26 @@
 -export([init/1]).
 
 %% internal exports:
--export([start_link_sup/2]).
+-export([start_link_sup/2, start_link_sentinel/1, init_sentinel/2]).
 
 %%================================================================================
 %% Type declarations
 %%================================================================================
 
--define(via(REC), {via, gproc, {n, l, REC}}).
+-define(name(REC), {n, l, REC}).
+-define(via(REC), {via, gproc, ?name(REC)}).
 
 -define(db_sup, ?MODULE).
--define(shards_sup, emqx_ds_builtin_db_shards_sup).
--define(egress_sup, emqx_ds_builtin_db_egress_sup).
--define(shard_sup, emqx_ds_builtin_db_shard_sup).
+-define(shards_sup, emqx_ds_builtin_raft_db_shards_sup).
+-define(egress_sup, emqx_ds_builtin_raft_db_egress_sup).
+-define(shard_sup, emqx_ds_builtin_raft_db_shard_sup).
+-define(shard_sentinel, emqx_ds_builtin_raft_db_shard_sentinel).
 
 -record(?db_sup, {db}).
 -record(?shards_sup, {db}).
 -record(?egress_sup, {db}).
 -record(?shard_sup, {db, shard}).
+-record(?shard_sentinel, {shardid}).
 
 %%================================================================================
 %% API functions
@@ -77,6 +81,13 @@ stop_shard({DB, Shard}) ->
             {error, Reason}
     end.
 
+-spec shard_info(emqx_ds_storage_layer:shard_id(), ready) -> boolean() | down.
+shard_info(ShardId = {DB, Shard}, Info) ->
+    case sentinel_alive(ShardId) of
+        true -> emqx_ds_replication_layer_shard:shard_info(DB, Shard, Info);
+        false -> down
+    end.
+
 -spec terminate_storage(emqx_ds_storage_layer:shard_id()) -> ok | {error, _Reason}.
 terminate_storage({DB, Shard}) ->
     Sup = ?via(#?shard_sup{db = DB, shard = Shard}),
@@ -181,7 +192,8 @@ init({#?shard_sup{db = DB, shard = Shard}, _}) ->
     Children = [
         shard_storage_spec(DB, Shard, Opts),
         shard_replication_spec(DB, Shard, Opts),
-        shard_beamformers_spec(DB, Shard)
+        shard_beamformers_spec(DB, Shard),
+        shard_sentinel_spec(DB, Shard)
     ],
     {ok, {SupFlags, Children}}.
 
@@ -220,6 +232,22 @@ start_ra_system(DB, #{replication_options := ReplicationOpts}) ->
 start_link_sup(Id, Options) ->
     supervisor:start_link(?via(Id), ?MODULE, {Id, Options}).
 
+-spec start_link_sentinel(emqx_ds_storage_layer:shard_id()) -> {ok, pid()}.
+start_link_sentinel(Id) ->
+    proc_lib:start_link(?MODULE, init_sentinel, [self(), Id]).
+
+-spec init_sentinel(pid(), emqx_ds_storage_layer:shard_id()) -> no_return().
+init_sentinel(Parent, Id) ->
+    Name = ?name(#?shard_sentinel{shardid = Id}),
+    gproc:reg(Name),
+    proc_lib:init_ack(Parent, {ok, self()}),
+    receive
+        %% Not trapping exits, but just in case.
+        {'EXIT', _Pid, Reason} ->
+            gproc:unreg(Name),
+            exit(Reason)
+    end.
+
 %%================================================================================
 %% Internal functions
 %%================================================================================
@@ -292,6 +320,18 @@ shard_beamformers_spec(DB, Shard) ->
             ]}
     }.
 
+shard_sentinel_spec(DB, Shard) ->
+    #{
+        id => {Shard, sentinel},
+        type => worker,
+        restart => permanent,
+        shutdown => brutal_kill,
+        start => {?MODULE, start_link_sentinel, [{DB, Shard}]}
+    }.
+
+sentinel_alive(Id) ->
+    gproc:where(?name(#?shard_sentinel{shardid = Id})) =/= undefined.
+
 ensure_started(Res) ->
     case Res of
         {ok, _Pid} ->

+ 30 - 22
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -419,12 +419,23 @@ poll(DB, Iterators, PollOpts = #{timeout := Timeout}) ->
     ),
     maps:foreach(
         fun(Shard, ShardIts) ->
-            ok = ra_poll(
+            Result = ra_poll(
                 DB,
                 Shard,
                 [{{ReplyTo, Token}, It} || {Token, It} <- ShardIts],
                 PollOpts
-            )
+            ),
+            case Result of
+                ok ->
+                    ok;
+                {error, Class, Reason} ->
+                    ?tp(debug, ds_repl_poll_shard_failed, #{
+                        db => DB,
+                        shard => Shard,
+                        class => Class,
+                        reason => Reason
+                    })
+            end
         end,
         Groups
     ),
@@ -527,10 +538,10 @@ shards_of_batch(_DB, [], Acc) ->
 %% TODO
 %% There's a possibility of race condition: storage may shut down right after we
 %% ask for its status.
--define(IF_SHARD_READY(DB, SHARD, EXPR),
-    case emqx_ds_replication_layer_shard:shard_info(DB, SHARD, ready) of
+-define(IF_SHARD_READY(SHARDID, EXPR),
+    case emqx_ds_builtin_raft_db_sup:shard_info(SHARDID, ready) of
         true -> EXPR;
-        false -> {error, recoverable, shard_unavailable}
+        _Unready -> {error, recoverable, shard_unavailable}
     end
 ).
 
@@ -574,8 +585,7 @@ do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
 do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
     ShardId = {DB, Shard},
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         emqx_ds_storage_layer:get_streams(ShardId, TopicFilter, StartTime)
     ).
 
@@ -602,8 +612,7 @@ do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
 do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
     ShardId = {DB, Shard},
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         emqx_ds_storage_layer:make_iterator(ShardId, Stream, TopicFilter, StartTime)
     ).
 
@@ -638,8 +647,7 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) ->
 do_next_v1(DB, Shard, Iter, BatchSize) ->
     ShardId = {DB, Shard},
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         emqx_ds_storage_layer:next(
             ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard)
         )
@@ -672,8 +680,7 @@ do_add_generation_v2(_DB) ->
 do_list_generations_with_lifetimes_v3(DB, Shard) ->
     ShardId = {DB, Shard},
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         emqx_ds_storage_layer:list_generations_with_lifetimes(ShardId)
     ).
 
@@ -700,11 +707,14 @@ do_get_delete_streams_v4(DB, Shard, TopicFilter, StartTime) ->
 do_poll_v1(SourceNode, DB, Shard, Iterators, PollOpts) ->
     ShardId = {DB, Shard},
     ?tp(ds_raft_do_poll, #{shard => ShardId, iterators => Iterators}),
-    lists:foreach(
-        fun({RAddr, It}) ->
-            emqx_ds_beamformer:poll(SourceNode, RAddr, ShardId, It, PollOpts)
-        end,
-        Iterators
+    ?IF_SHARD_READY(
+        ShardId,
+        lists:foreach(
+            fun({RAddr, It}) ->
+                emqx_ds_beamformer:poll(SourceNode, RAddr, ShardId, It, PollOpts)
+            end,
+            Iterators
+        )
     ).
 
 %%================================================================================
@@ -1249,11 +1259,9 @@ snapshot_module() ->
 unpack_iterator(Shard, #{?tag := ?IT, ?enc := Iterator}) ->
     emqx_ds_storage_layer:unpack_iterator(Shard, Iterator).
 
-scan_stream(ShardId, Stream, TopicFilter, StartMsg, BatchSize) ->
-    {DB, Shard} = ShardId,
+scan_stream(ShardId = {DB, Shard}, Stream, TopicFilter, StartMsg, BatchSize) ->
     ?IF_SHARD_READY(
-        DB,
-        Shard,
+        ShardId,
         begin
             Now = current_timestamp(DB, Shard),
             emqx_ds_storage_layer:scan_stream(

+ 17 - 5
apps/emqx_ds_builtin_raft/test/emqx_ds_replication_SUITE.erl

@@ -772,17 +772,18 @@ t_error_mapping_replication_layer(init, Config) ->
     Apps = emqx_cth_suite:start([emqx_ds_builtin_raft], #{
         work_dir => ?config(work_dir, Config)
     }),
+    ok = snabbkaffe:start_trace(),
+    ok = emqx_ds_test_helpers:mock_rpc(),
     [{apps, Apps} | Config];
 t_error_mapping_replication_layer('end', Config) ->
+    emqx_ds_test_helpers:unmock_rpc(),
+    snabbkaffe:stop(),
     emqx_cth_suite:stop(?config(apps, Config)),
     Config.
 
 t_error_mapping_replication_layer(Config) ->
     %% This checks that the replication layer maps recoverable errors correctly.
 
-    ok = emqx_ds_test_helpers:mock_rpc(),
-    ok = snabbkaffe:start_trace(),
-
     DB = ?FUNCTION_NAME,
     ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config, #{n_shards => 2}))),
     [Shard1, Shard2] = emqx_ds_replication_layer_meta:shards(DB),
@@ -865,7 +866,19 @@ t_error_mapping_replication_layer(Config) ->
         length([error || {error, _, _} <- Results2]) > 0,
         Results2
     ),
-    meck:unload().
+
+    %% Calling `emqx_ds:poll/3` succeeds, but some poll requests should fail anyway.
+    {ok, SRef} = snabbkaffe:subscribe(
+        ?match_event(#{?snk_kind := ds_repl_poll_shard_failed}),
+        length(Streams0),
+        500
+    ),
+    UserData = ?FUNCTION_NAME,
+    {ok, _PollRef} = emqx_ds:poll(DB, [{UserData, I} || I <- Iterators0], #{timeout => 1_000}),
+    ?assertMatch(
+        {timeout, Events} when length(Events) > 0,
+        snabbkaffe:receive_events(SRef)
+    ).
 
 %% This testcase verifies the behavior of `store_batch' operation
 %% when the underlying code experiences recoverable or unrecoverable
@@ -1068,7 +1081,6 @@ t_poll('end', Config) ->
     ok = emqx_cth_cluster:stop(?config(nodes, Config)).
 
 t_poll(Config) ->
-    DB = ?FUNCTION_NAME,
     Nodes = [N1 | _] = ?config(nodes, Config),
     ?check_trace(
         #{timetrap => 15_000},

+ 1 - 1
apps/emqx_enterprise/src/emqx_enterprise.app.src

@@ -1,6 +1,6 @@
 {application, emqx_enterprise, [
     {description, "EMQX Enterprise Edition"},
-    {vsn, "0.2.4"},
+    {vsn, "0.2.5"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 5
apps/emqx_enterprise/src/emqx_enterprise_schema.erl

@@ -64,11 +64,7 @@ fields("log_audit_handler") ->
                     desc => ?DESC(emqx_conf_schema, "audit_file_handler_path"),
                     default => <<"${EMQX_LOG_DIR}/audit.log">>,
                     importance => ?IMPORTANCE_HIGH,
-                    converter => fun(Path, Opts) ->
-                        emqx_schema:naive_env_interpolation(
-                            emqx_conf_schema:ensure_unicode_path(Path, Opts)
-                        )
-                    end
+                    converter => fun emqx_conf_schema:log_file_path_converter/2
                 }
             )},
         {"rotation_count",

+ 2 - 2
apps/emqx_enterprise/test/emqx_enterprise_schema_SUITE.erl

@@ -78,7 +78,7 @@ t_audit_log_conf(_Config) ->
         <<"rotation_count">> => 10,
         <<"rotation_size">> => <<"50MB">>,
         <<"time_offset">> => <<"system">>,
-        <<"path">> => <<"log/emqx.log">>,
+        <<"path">> => <<"${EMQX_LOG_DIR}/emqx.log">>,
         <<"timestamp_format">> => <<"auto">>,
         <<"payload_encode">> => <<"text">>
     },
@@ -100,7 +100,7 @@ t_audit_log_conf(_Config) ->
             #{
                 <<"enable">> => false,
                 <<"level">> => <<"info">>,
-                <<"path">> => <<"log/audit.log">>,
+                <<"path">> => <<"${EMQX_LOG_DIR}/audit.log">>,
                 <<"ignore_high_frequency_request">> => true,
                 <<"max_filter_size">> => 5000,
                 <<"rotation_count">> => 10,

+ 1 - 1
apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl

@@ -227,7 +227,7 @@ t_explicit_session_takeover(Config) ->
         begin
             ok = rpc:call(Node1, emqx_eviction_agent, evict_connections, [1]),
             receive
-                {'EXIT', C0, {disconnected, ?RC_USE_ANOTHER_SERVER, _}} -> ok
+                {'EXIT', C0, {shutdown, {disconnected, ?RC_USE_ANOTHER_SERVER, _}}} -> ok
             after 1000 ->
                 ?assert(false, "Connection not evicted")
             end

+ 1 - 1
apps/emqx_mysql/mix.exs

@@ -23,7 +23,7 @@ defmodule EMQXMysql.MixProject do
 
   def deps() do
     [
-      {:mysql, github: "emqx/mysql-otp", tag: "1.7.4.2"},
+      {:mysql, github: "emqx/mysql-otp", tag: "1.7.4.3"},
       {:emqx_connector, in_umbrella: true, runtime: false},
       {:emqx_resource, in_umbrella: true}
     ]

+ 1 - 1
apps/emqx_mysql/rebar.config

@@ -3,7 +3,7 @@
 {erl_opts, [debug_info]}.
 {deps, [
     %% NOTE: mind ecpool version when updating eredis_cluster version
-    {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.4.2"}}},
+    {mysql, {git, "https://github.com/emqx/mysql-otp", {tag, "1.7.4.3"}}},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}}
 ]}.

+ 2 - 2
apps/emqx_node_rebalance/test/emqx_node_rebalance_evacuation_SUITE.erl

@@ -204,7 +204,7 @@ t_conn_evicted(Config) ->
     ),
 
     receive
-        {'EXIT', C, {disconnected, 156, _}} -> ok
+        {'EXIT', C, {shutdown, {disconnected, 156, _}}} -> ok
     after 1000 ->
         ct:fail("Connection not evicted")
     end.
@@ -245,7 +245,7 @@ t_session_evicted(Config) ->
     ),
 
     receive
-        {'EXIT', C, {disconnected, ?RC_USE_ANOTHER_SERVER, _}} -> ok
+        {'EXIT', C, {shutdown, {disconnected, ?RC_USE_ANOTHER_SERVER, _}}} -> ok
     after 1000 ->
         ct:fail("Connection not evicted")
     end,

+ 4 - 0
apps/emqx_opentelemetry/include/emqx_otel_trace.hrl

@@ -13,6 +13,10 @@
 -define(CLIENT_UNSUBSCRIBE_SPAN_NAME, 'client.unsubscribe').
 -define(CLIENT_PUBLISH_SPAN_NAME, 'client.publish').
 
+-define(BROKER_DISCONNECT_SPAN_NAME, 'broker.disconnect').
+-define(BROKER_SUBSCRIBE_SPAN_NAME, 'broker.subscribe').
+-define(BROKER_UNSUBSCRIBE_SPAN_NAME, 'broker.unsubscribe').
+
 -define(CLIENT_AUTHN_SPAN_NAME, 'client.authn').
 -define(CLIENT_AUTHZ_SPAN_NAME, 'client.authz').
 

+ 70 - 0
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -24,6 +24,10 @@
     client_authn/3,
     client_authz/3,
 
+    broker_disconnect/3,
+    broker_subscribe/3,
+    broker_unsubscribe/3,
+
     %% Message Processing Spans (From Client)
     %% PUBLISH(form Publisher) -> ROUTE -> FORWARD(optional) -> DELIVER(to Subscribers)
     client_publish/3,
@@ -325,6 +329,72 @@ client_authz(Packet, Attrs, ProcessFun) ->
         )
     ).
 
+-spec broker_disconnect(
+    Any,
+    Attrs,
+    fun((Any) -> Res)
+) ->
+    Res
+when
+    Any :: term(),
+    Attrs :: attrs(),
+    Res :: term().
+broker_disconnect(Any, Attrs, ProcessFun) ->
+    ?with_trace_mode(
+        ProcessFun(Any),
+        ?with_span(
+            ?BROKER_DISCONNECT_SPAN_NAME,
+            #{attributes => Attrs},
+            fun(_SpanCtx) ->
+                ProcessFun(Any)
+            end
+        )
+    ).
+
+-spec broker_subscribe(
+    Any,
+    Attrs,
+    fun((Any) -> Res)
+) ->
+    Res
+when
+    Any :: term(),
+    Attrs :: attrs(),
+    Res :: term().
+broker_subscribe(Any, Attrs, ProcessFun) ->
+    ?with_trace_mode(
+        ProcessFun(Any),
+        ?with_span(
+            ?BROKER_SUBSCRIBE_SPAN_NAME,
+            #{attributes => Attrs},
+            fun(_SpanCtx) ->
+                ProcessFun(Any)
+            end
+        )
+    ).
+
+-spec broker_unsubscribe(
+    Any,
+    Attrs,
+    fun((Any) -> Res)
+) ->
+    Res
+when
+    Any :: term(),
+    Attrs :: attrs(),
+    Res :: term().
+broker_unsubscribe(Any, Attrs, ProcessFun) ->
+    ?with_trace_mode(
+        ProcessFun(Any),
+        ?with_span(
+            ?BROKER_UNSUBSCRIBE_SPAN_NAME,
+            #{attributes => Attrs},
+            fun(_SpanCtx) ->
+                ProcessFun(Any)
+            end
+        )
+    ).
+
 -spec client_publish(
     Packet,
     Attrs,

+ 9 - 3
apps/emqx_opentelemetry/src/sampler/emqx_otel_sampler.erl

@@ -180,7 +180,10 @@ should_sample(
         SpanName =:= ?CLIENT_DISCONNECT_SPAN_NAME orelse
         SpanName =:= ?CLIENT_SUBSCRIBE_SPAN_NAME orelse
         SpanName =:= ?CLIENT_UNSUBSCRIBE_SPAN_NAME orelse
-        SpanName =:= ?CLIENT_PUBLISH_SPAN_NAME
+        SpanName =:= ?CLIENT_PUBLISH_SPAN_NAME orelse
+        SpanName =:= ?BROKER_DISCONNECT_SPAN_NAME orelse
+        SpanName =:= ?BROKER_SUBSCRIBE_SPAN_NAME orelse
+        SpanName =:= ?BROKER_UNSUBSCRIBE_SPAN_NAME
 ->
     Desicion =
         decide_by_match_rule(Attributes, Opts) orelse
@@ -241,12 +244,15 @@ decide_by_traceid_ratio(TraceId, SpanName, #{id_upper := IdUpperBound} = Opts) -
 
 span_name_to_config_key(SpanName) when
     SpanName =:= ?CLIENT_CONNECT_SPAN_NAME orelse
-        SpanName =:= ?CLIENT_DISCONNECT_SPAN_NAME
+        SpanName =:= ?CLIENT_DISCONNECT_SPAN_NAME orelse
+        SpanName =:= ?BROKER_DISCONNECT_SPAN_NAME
 ->
     client_connect_disconnect;
 span_name_to_config_key(SpanName) when
     SpanName =:= ?CLIENT_SUBSCRIBE_SPAN_NAME orelse
-        SpanName =:= ?CLIENT_UNSUBSCRIBE_SPAN_NAME
+        SpanName =:= ?CLIENT_UNSUBSCRIBE_SPAN_NAME orelse
+        SpanName =:= ?BROKER_SUBSCRIBE_SPAN_NAME orelse
+        SpanName =:= ?BROKER_UNSUBSCRIBE_SPAN_NAME
 ->
     client_subscribe_unsubscribe;
 span_name_to_config_key(?CLIENT_PUBLISH_SPAN_NAME) ->

+ 27 - 21
apps/emqx_resource/src/emqx_resource.erl

@@ -51,6 +51,7 @@
     reset_metrics_local/2,
     %% Create metrics for a resource ID
     create_metrics/1,
+    ensure_metrics/1,
     %% Delete metrics for a resource ID
     clear_metrics/1
 ]).
@@ -725,33 +726,38 @@ deallocate_resource(InstanceId, Key) ->
 
 -spec create_metrics(resource_id()) -> ok.
 create_metrics(ResId) ->
-    emqx_metrics_worker:create_metrics(
-        ?RES_METRICS,
-        ResId,
-        [
-            'matched',
-            'retried',
-            'retried.success',
-            'retried.failed',
-            'success',
-            'late_reply',
-            'failed',
-            'dropped',
-            'dropped.expired',
-            'dropped.queue_full',
-            'dropped.resource_not_found',
-            'dropped.resource_stopped',
-            'dropped.other',
-            'received'
-        ],
-        [matched]
-    ).
+    emqx_metrics_worker:create_metrics(?RES_METRICS, ResId, metrics(), rate_metrics()).
+
+-spec ensure_metrics(resource_id()) -> {ok, created | already_created}.
+ensure_metrics(ResId) ->
+    emqx_metrics_worker:ensure_metrics(?RES_METRICS, ResId, metrics(), rate_metrics()).
 
 -spec clear_metrics(resource_id()) -> ok.
 clear_metrics(ResId) ->
     emqx_metrics_worker:clear_metrics(?RES_METRICS, ResId).
 %% =================================================================================
 
+metrics() ->
+    [
+        'matched',
+        'retried',
+        'retried.success',
+        'retried.failed',
+        'success',
+        'late_reply',
+        'failed',
+        'dropped',
+        'dropped.expired',
+        'dropped.queue_full',
+        'dropped.resource_not_found',
+        'dropped.resource_stopped',
+        'dropped.other',
+        'received'
+    ].
+
+rate_metrics() ->
+    ['matched'].
+
 filter_instances(Filter) ->
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
 

+ 13 - 0
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -402,6 +402,7 @@ get_metrics(ResId) ->
 %% @doc Reset the metrics for the specified resource
 -spec reset_metrics(resource_id()) -> ok.
 reset_metrics(ResId) ->
+    ok = ensure_metrics(ResId),
     emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId).
 
 %% @doc Returns the data for all resources
@@ -791,6 +792,7 @@ handle_remove_event(From, ClearMetrics, Data) ->
 start_resource(Data, From) ->
     %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
     #data{id = ResId, mod = Mod, config = Config, group = Group, type = Type} = Data,
+    ok = ensure_metrics(ResId),
     case emqx_resource:call_start(ResId, Mod, Config) of
         {ok, ResourceState} ->
             UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState},
@@ -955,6 +957,7 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
                 added_channels = NewAddedChannelsMap
             };
         {error, Reason} ->
+            ?tp("remove_channel_failed", #{resource_id => ResId, reason => Reason}),
             ?SLOG(
                 log_level(IsDryRun),
                 #{
@@ -1083,6 +1086,7 @@ handle_remove_channel_exists(From, ChannelId, Data) ->
             {keep_state, update_state(UpdatedData), [{reply, From, ok}]};
         {error, Reason} = Error ->
             IsDryRun = emqx_resource:is_dry_run(Id),
+            ?tp("remove_channel_failed", #{resource_id => Id, reason => Reason}),
             ?SLOG(
                 log_level(IsDryRun),
                 #{
@@ -1682,6 +1686,7 @@ parse_health_check_result({Status, NewState}, _Data) when ?IS_STATUS(Status) ->
 parse_health_check_result({Status, NewState, Error}, _Data) when ?IS_STATUS(Status) ->
     {Status, NewState, {error, Error}};
 parse_health_check_result({error, Error}, Data) ->
+    ?tp("health_check_exception", #{resource_id => Data#data.id, reason => Error}),
     ?SLOG(
         error,
         #{
@@ -1899,3 +1904,11 @@ abort_channel_health_check(Pid) ->
         {'EXIT', Pid, _} ->
             ok
     end.
+
+%% For still unknown reasons (e.g.: `emqx_metrics_worker' process might die?), metrics
+%% might be lost for a running resource, and future attempts to bump them result in
+%% errors.  As mitigation, we ensure such metrics are created here so that restarting
+%% the resource or resetting its metrics can recreate them.
+ensure_metrics(ResId) ->
+    {ok, _} = emqx_resource:ensure_metrics(ResId),
+    ok.

+ 6 - 4
apps/emqx_resource/src/emqx_resource_metrics.erl

@@ -130,10 +130,11 @@ handle_telemetry_event(
             %% We catch errors to avoid detaching the telemetry handler function.
             %% When restarting a resource while it's under load, there might be transient
             %% failures while the metrics are not yet created.
-            ?SLOG(
+            ?SLOG_THROTTLE(
                 warning,
+                ID,
                 #{
-                    msg => "handle_resource_metrics_failed",
+                    msg => handle_resource_metrics_failed,
                     hint => "transient failures may occur when restarting a resource",
                     kind => Kind,
                     reason => Reason,
@@ -158,10 +159,11 @@ handle_telemetry_event(
             %% We catch errors to avoid detaching the telemetry handler function.
             %% When restarting a resource while it's under load, there might be transient
             %% failures while the metrics are not yet created.
-            ?SLOG(
+            ?SLOG_THROTTLE(
                 warning,
+                ID,
                 #{
-                    msg => "handle_resource_metrics_failed",
+                    msg => handle_resource_metrics_failed,
                     hint => "transient failures may occur when restarting a resource",
                     kind => Kind,
                     reason => Reason,

+ 3 - 4
apps/emqx_resource/src/emqx_resource_pool.erl

@@ -121,10 +121,9 @@ health_check_workers(PoolName, CheckFunc, Timeout, Opts) ->
             end
     end.
 
-parse_reason({
-    {shutdown, {failed_to_start_child, _, {shutdown, {failed_to_start_child, _, Reason}}}},
-    _
-}) ->
+parse_reason({worker_start_failed, Reason}) ->
+    Reason;
+parse_reason({worker_exit, Reason}) ->
     Reason;
 parse_reason(Reason) ->
     Reason.

+ 1 - 1
apps/emqx_retainer/rebar.config

@@ -30,7 +30,7 @@
 {profiles, [
     {test, [
         {deps, [
-            {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.0"}}}
+            {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.5"}}}
         ]}
     ]}
 ]}.

+ 82 - 22
apps/emqx_utils/src/emqx_metrics_worker.erl

@@ -45,6 +45,7 @@
     get_counters/2,
     create_metrics/3,
     create_metrics/4,
+    ensure_metrics/4,
     clear_metrics/2,
     reset_metrics/2,
     has_metrics/2
@@ -135,6 +136,13 @@
     slides = #{} :: #{metric_id() => #{metric_name() => #slide{}}}
 }).
 
+%% calls/casts/infos
+-record(ensure_metrics, {
+    id :: metric_id(),
+    metrics :: [metric_spec() | metric_name()],
+    rate_metrics :: [metric_name()]
+}).
+
 %%------------------------------------------------------------------------------
 %% APIs
 %%------------------------------------------------------------------------------
@@ -166,6 +174,16 @@ create_metrics(Name, Id, Metrics, RateMetrics) ->
     Metrics1 = desugar(Metrics),
     gen_server:call(Name, {create_metrics, Id, Metrics1, RateMetrics}).
 
+-spec ensure_metrics(handler_name(), metric_id(), [metric_spec() | metric_name()], [atom()]) ->
+    {ok, created | already_created} | {error, term()}.
+ensure_metrics(Name, Id, Metrics0, RateMetrics) ->
+    Metrics = desugar(Metrics0),
+    gen_server:call(
+        Name,
+        #ensure_metrics{id = Id, metrics = Metrics, rate_metrics = RateMetrics},
+        infinity
+    ).
+
 -spec clear_metrics(handler_name(), metric_id()) -> ok.
 clear_metrics(Name, Id) ->
     gen_server:call(Name, {delete_metrics, Id}).
@@ -379,28 +397,12 @@ handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
             undefined -> make_rate(0, 0, 0);
             RatesPerId -> format_rates_of_id(RatesPerId)
         end, State};
-handle_call(
-    {create_metrics, Id, Metrics, RateMetrics},
-    _From,
-    State = #state{metric_ids = MIDs, rates = Rates, slides = Slides}
-) ->
-    case RateMetrics -- filter_counters(Metrics) of
-        [] ->
-            RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]),
-            Rate1 =
-                case Rates of
-                    undefined -> #{Id => RatePerId};
-                    _ -> Rates#{Id => RatePerId}
-                end,
-            Slides1 = Slides#{Id => create_slides(Metrics)},
-            {reply, create_counters(get_self_name(), Id, Metrics), State#state{
-                metric_ids = sets:add_element(Id, MIDs),
-                rates = Rate1,
-                slides = Slides1
-            }};
-        _ ->
-            {reply, {error, not_super_set_of, {RateMetrics, Metrics}}, State}
-    end;
+handle_call({create_metrics, Id, Metrics, RateMetrics}, _From, State0) ->
+    {Result, State} = handle_create_metrics(State0, Id, Metrics, RateMetrics),
+    {reply, Result, State};
+handle_call(#ensure_metrics{id = Id, metrics = Metrics, rate_metrics = RateMetrics}, _From, State0) ->
+    {Result, State} = handle_ensure_metrics(State0, Id, Metrics, RateMetrics),
+    {reply, Result, State};
 handle_call(
     {delete_metrics, Id},
     _From,
@@ -702,3 +704,61 @@ create_slides(Metrics) ->
 get_self_name() ->
     {registered_name, Name} = process_info(self(), registered_name),
     Name.
+
+is_superset_of(RateMetrics, Metrics) ->
+    case RateMetrics -- filter_counters(Metrics) of
+        [] ->
+            true;
+        [_ | _] ->
+            false
+    end.
+
+handle_create_metrics(State0, Id, Metrics, RateMetrics) ->
+    #state{metric_ids = MIDs0, rates = Rates0, slides = Slides0} = State0,
+    case is_superset_of(RateMetrics, Metrics) of
+        true ->
+            RatesPerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]),
+            Rates =
+                case Rates0 of
+                    undefined -> #{Id => RatesPerId};
+                    _ -> Rates0#{Id => RatesPerId}
+                end,
+            Slides = Slides0#{Id => create_slides(Metrics)},
+            MIDs = sets:add_element(Id, MIDs0),
+            State = State0#state{
+                metric_ids = MIDs,
+                rates = Rates,
+                slides = Slides
+            },
+            Result = create_counters(get_self_name(), Id, Metrics),
+            {Result, State};
+        false ->
+            {{error, not_super_set_of, {RateMetrics, Metrics}}, State0}
+    end.
+
+handle_ensure_metrics(State0, Id, Metrics, RateMetrics) ->
+    #state{metric_ids = MIDs, rates = Rates, slides = Slides} = State0,
+    Name = get_self_name(),
+    CounterKeys = filter_counters(Metrics),
+    CurrentCounters = get_counters(Name, Id),
+    HasAllCounters = [] =:= (CounterKeys -- maps:keys(CurrentCounters)),
+    HasMetricId = sets:is_element(Id, MIDs),
+    CurrentRates =
+        %% todo: no need to have `undefined' here?
+        case Rates of
+            #{Id := Rs} -> maps:keys(Rs);
+            _ -> []
+        end,
+    HasRates = [] =:= (RateMetrics -- CurrentRates),
+    SlideKeys = [K || {slide, K} <- Metrics],
+    CurrentSlides = maps:keys(maps:get(Id, Slides, #{})),
+    HasSlides = [] =:= (SlideKeys -- CurrentSlides),
+    case HasMetricId andalso HasAllCounters andalso HasRates andalso HasSlides of
+        true ->
+            {{ok, already_created}, State0};
+        false ->
+            case handle_create_metrics(State0, Id, Metrics, RateMetrics) of
+                {ok, State} -> {{ok, created}, State};
+                {Result, State} -> {Result, State}
+            end
+    end.

+ 103 - 0
apps/emqx_utils/test/emqx_metrics_worker_SUITE.erl

@@ -429,3 +429,106 @@ t_shift_gauge(_Config) ->
     ?assertEqual(2, emqx_metrics_worker:get_gauge(?NAME, AnotherId, Metric)),
 
     ok.
+
+%% Tests that check the behavior of `ensure_metrics'.
+t_ensure_metrics(_Config) ->
+    Id1 = <<"id1">>,
+    Metrics1 = [c1, {counter, c2}, c3, {slide, s1}, {slide, s2}],
+    RateMetrics1 = [c2, c3],
+    %% Behaves as `create_metrics' if absent
+    ?assertEqual(
+        {ok, created},
+        emqx_metrics_worker:ensure_metrics(?NAME, Id1, Metrics1, RateMetrics1)
+    ),
+    ?assertMatch(
+        #{
+            counters := #{c1 := _, c2 := _, c3 := _},
+            rate := #{c2 := _, c3 := _},
+            gauges := #{},
+            slides := #{s1 := _, s2 := _}
+        },
+        emqx_metrics_worker:get_metrics(?NAME, Id1)
+    ),
+    %% Does nothing if everything is in place
+    ?assertEqual(
+        {ok, already_created},
+        emqx_metrics_worker:ensure_metrics(?NAME, Id1, Metrics1, RateMetrics1)
+    ),
+    ?assertMatch(
+        #{
+            counters := #{c1 := _, c2 := _, c3 := _},
+            rate := #{c2 := _, c3 := _},
+            gauges := #{},
+            slides := #{s1 := _, s2 := _}
+        },
+        emqx_metrics_worker:get_metrics(?NAME, Id1)
+    ),
+
+    %% Does nothing if asked to ensure a subset of existing metrics
+    Metrics2 = [c1],
+    RateMetrics2 = [c3],
+    ?assertEqual(
+        {ok, already_created},
+        emqx_metrics_worker:ensure_metrics(?NAME, Id1, Metrics2, RateMetrics2)
+    ),
+    ?assertEqual(
+        {ok, already_created},
+        emqx_metrics_worker:ensure_metrics(?NAME, Id1, [], [])
+    ),
+    ?assertMatch(
+        #{
+            counters := #{c1 := _, c2 := _, c3 := _},
+            rate := #{c2 := _, c3 := _},
+            gauges := #{},
+            slides := #{s1 := _, s2 := _}
+        },
+        emqx_metrics_worker:get_metrics(?NAME, Id1)
+    ),
+
+    %% If we have an initially smaller set of metrics, `ensure_metrics' will behave as
+    %% `create_metrics' if one is missing.
+    Id2 = <<"id2">>,
+    lists:foreach(
+        fun(
+            #{remove_from_metrics := RemoveFromMetrics, remove_from_rates := RemoveFromRates} = Ctx
+        ) ->
+            ok = emqx_metrics_worker:clear_metrics(?NAME, Id2),
+            Metrics3 = Metrics1 -- RemoveFromMetrics,
+            RateMetrics3 = RateMetrics1 -- RemoveFromRates,
+            ok = emqx_metrics_worker:create_metrics(?NAME, Id2, Metrics3, RateMetrics3),
+            ?assertEqual(
+                {ok, created},
+                emqx_metrics_worker:ensure_metrics(?NAME, Id2, Metrics1, RateMetrics1),
+                Ctx
+            ),
+            ?assertMatch(
+                #{
+                    counters := #{c1 := _, c2 := _, c3 := _},
+                    rate := #{c2 := _, c3 := _},
+                    gauges := #{},
+                    slides := #{s1 := _, s2 := _}
+                },
+                emqx_metrics_worker:get_metrics(?NAME, Id2)
+            ),
+            ok
+        end,
+        [
+            #{
+                remove_from_metrics => [c1],
+                remove_from_rates => []
+            },
+            #{
+                remove_from_metrics => [{counter, c2}],
+                remove_from_rates => [c2]
+            },
+            #{
+                remove_from_metrics => [{slide, s2}],
+                remove_from_rates => []
+            },
+            #{
+                remove_from_metrics => [],
+                remove_from_rates => RateMetrics1
+            }
+        ]
+    ),
+    ok.

+ 11 - 0
changes/ce/fix-14226.en.md

@@ -0,0 +1,11 @@
+Previously, under high stress, the node could lose track of a resource's (action/source) metrics and not be able to recover until the node is reboot.  This is now mitigated by attempting to recreate such metrics when either restarting the resource or resetting its metrics.
+
+Also, warning logs about failures to bump said metrics would flood the logs for "hot-path" metrics such as `matched`.  Now, such logs are throttled to avoid bloating log files.
+
+An example of such throttled log:
+
+```
+2024-11-14T13:56:44.134289+00:00 [warning] tag: RESOURCE, clientid: clientid, msg: handle_resource_metrics_failed, peername: 172.100.239.1:33896, reason: {badkey,matched}, stacktrace: [{erlang,map_get,[matched,#{}],[{error_info,#{module => erl_erts_errors}}]},{emqx_metrics_worker,idx_metric,4,[{file,"src/emqx_metrics_worker.erl"},{line,560}]},...
+
+2024-11-14T13:57:12.490503+00:00 [warning] msg: log_events_throttled_during_last_period, period: 1 minutes, 0 seconds, dropped: #{handle_resource_metrics_failed => 2294}
+```

+ 1 - 0
changes/ce/fix-14266.en.md

@@ -0,0 +1 @@
+Update `emqtt` from 1.13.0 to 1.13.5, please refer to [emqtt's change logs](https://github.com/emqx/emqtt/blob/1.13.5/changelog.md) for more details.

+ 8 - 0
changes/ce/fix-14289.en.md

@@ -0,0 +1,8 @@
+Fix log file path issue when importing config from a different environment.
+
+Environment variable `EMQX_LOG_DIR` in docker is `/opt/emqx/log`, but `/var/log/emqx/` when installed from RPM/DEB packages.
+
+Prior to this fix, log file paths (default file handler and audit handler) are environment-variable interpolated when being exported.
+This causes the config import to a different environment crash due to the directory being absent.
+
+This fix ensures that the log file paths are not environment-variable interpolated when being exported. It also made sure that the absolute paths log directory from old version export are converted back to environment-variable if the path does not exist in the new environment.

+ 1 - 0
changes/ce/fix-14296.en.md

@@ -0,0 +1 @@
+Avoid `ecpool_sup` being blocked by a very slow-starting `ecpool_worker`.

+ 1 - 0
changes/ee/fix-14294.md

@@ -0,0 +1 @@
+Improve logs in `mysql_conn` to remove OTP crash reports.

+ 1 - 0
changes/ee/fix-14298.en.md

@@ -0,0 +1 @@
+Tolerate transient remote shard failures in DS Raft/RocksDB backend that could have caused durable sessions to crash when polling shards for updates.

+ 4 - 11
mix.exs

@@ -206,7 +206,7 @@ defmodule EMQXUmbrella.MixProject do
 
   def common_dep(:cowboy), do: {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}
   def common_dep(:jsone), do: {:jsone, github: "emqx/jsone", tag: "1.7.1", override: true}
-  def common_dep(:ecpool), do: {:ecpool, github: "emqx/ecpool", tag: "0.5.10", override: true}
+  def common_dep(:ecpool), do: {:ecpool, github: "emqx/ecpool", tag: "0.5.12", override: true}
   def common_dep(:replayq), do: {:replayq, github: "emqx/replayq", tag: "0.3.10", override: true}
   def common_dep(:jsx), do: {:jsx, github: "talentdeficit/jsx", tag: "v3.1.0", override: true}
   # in conflict by emqtt and hocon
@@ -246,7 +246,7 @@ defmodule EMQXUmbrella.MixProject do
   def common_dep(:emqtt),
     do:
       {:emqtt,
-       github: "emqx/emqtt", tag: "1.13.0", override: true, system_env: maybe_no_quic_env()}
+       github: "emqx/emqtt", tag: "1.13.5", override: true, system_env: maybe_no_quic_env()}
 
   def common_dep(:typerefl),
     do: {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}
@@ -1218,7 +1218,7 @@ defmodule EMQXUmbrella.MixProject do
     if enable_quicer?(),
       # in conflict with emqx and emqtt
       do: [
-        {:quicer, github: "emqx/quic", tag: "0.1.9", override: true}
+        {:quicer, github: "emqx/quic", tag: "0.1.10", override: true}
       ],
       else: []
   end
@@ -1231,10 +1231,7 @@ defmodule EMQXUmbrella.MixProject do
 
   def enable_quicer?() do
     "1" == System.get_env("BUILD_WITH_QUIC") or
-      not Enum.any?([
-        macos?(),
-        build_without_quic?()
-      ])
+      not build_without_quic?()
   end
 
   def get_emqx_flavor() do
@@ -1264,10 +1261,6 @@ defmodule EMQXUmbrella.MixProject do
     String.trim(str)
   end
 
-  def macos?() do
-    {:unix, :darwin} == :os.type()
-  end
-
   defp raspbian?() do
     os_cmd("./scripts/get-distro.sh", []) =~ "raspbian"
   end

+ 2 - 2
rebar.config

@@ -87,10 +87,10 @@
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.4.1"}}},
     {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},
     {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.4"}}},
-    {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.10"}}},
+    {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.12"}}},
     {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.10"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
-    {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.0"}}},
+    {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.13.5"}}},
     {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.2.1"}}},
     % NOTE: depends on recon 2.5.x
     {observer_cli, "1.7.5"},

+ 1 - 14
rebar.config.erl

@@ -36,7 +36,7 @@ assert_otp() ->
     end.
 
 quicer() ->
-    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.1.9"}}}.
+    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.1.10"}}}.
 
 jq() ->
     {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.12"}}}.
@@ -142,23 +142,10 @@ is_community_umbrella_app(_) -> true.
 is_build_without(Name) ->
     "1" =:= os:getenv("BUILD_WITHOUT_" ++ Name).
 
-%% BUILD_WITH_QUIC
-is_build_with(Name) ->
-    "1" =:= os:getenv("BUILD_WITH_" ++ Name).
-
 is_jq_supported() ->
     not is_build_without("JQ").
 
 is_quicer_supported() ->
-    %% for ones who want to build QUIC on macos
-    %% export BUILD_WITH_QUIC=1
-    is_build_with("QUIC") orelse
-        is_quicer_supported(os:type()).
-
-is_quicer_supported({unix, darwin}) ->
-    %% no quic on macos so far
-    false;
-is_quicer_supported(_) ->
     not is_build_without("QUIC").
 
 is_rocksdb_supported() ->