Procházet zdrojové kódy

Merge remote-tracking branch 'origin/release-53' into 1114-sync-release-53

Zaiming (Stone) Shi před 2 roky
rodič
revize
6eb3bb7cff
32 změnil soubory, kde provedl 330 přidání a 142 odebrání
  1. 1 1
      apps/emqx/rebar.config.script
  2. 11 4
      apps/emqx/src/emqx_cm.erl
  3. 3 1
      apps/emqx/src/proto/emqx_cm_proto_v2.erl
  4. 5 0
      apps/emqx_auth/include/emqx_authz.hrl
  5. 3 1
      apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl
  6. 17 8
      apps/emqx_auth/src/emqx_authz/emqx_authz.erl
  7. 9 2
      apps/emqx_auth/test/emqx_authn/emqx_authn_init_SUITE.erl
  8. 90 78
      apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl
  9. 21 22
      apps/emqx_auth/test/emqx_authz/emqx_authz_api_sources_SUITE.erl
  10. 46 0
      apps/emqx_auth/test/emqx_authz/emqx_authz_fake_source.erl
  11. 18 0
      apps/emqx_auth/test/emqx_authz/emqx_authz_test_lib.erl
  12. 1 1
      apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src
  13. 1 1
      apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl
  14. 1 1
      apps/emqx_bridge_http/src/emqx_bridge_http.app.src
  15. 9 4
      apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
  16. 2 2
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
  17. 2 2
      apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl
  18. 1 1
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src
  19. 1 1
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl
  20. 4 1
      apps/emqx_gcp_device/src/emqx_gcp_device.erl
  21. 10 4
      apps/emqx_resource/include/emqx_resource.hrl
  22. 14 0
      apps/emqx_resource/src/emqx_resource.erl
  23. 48 3
      apps/emqx_resource/src/emqx_resource_buffer_worker.erl
  24. 2 2
      apps/emqx_resource/src/emqx_resource_manager.erl
  25. 2 0
      changes/ce/fix-11747.en.md
  26. 2 0
      changes/ce/fix-11747.zh.md
  27. 1 0
      changes/ee/fix-11724.en.md
  28. 1 0
      changes/ee/fix-11733.en.md
  29. 1 0
      changes/ee/fix-11750.en.md
  30. 1 0
      changes/ee/fix-11760.en.md
  31. 1 1
      mix.exs
  32. 1 1
      rebar.config.erl

+ 1 - 1
apps/emqx/rebar.config.script

@@ -24,7 +24,7 @@ IsQuicSupp = fun() ->
 end,
 
 Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}},
-Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.201"}}}.
+Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.202"}}}.
 
 Dialyzer = fun(Config) ->
     {dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config),

+ 11 - 4
apps/emqx/src/emqx_cm.erl

@@ -298,9 +298,9 @@ takeover_session_begin(ClientId) ->
 
 takeover_session_begin(ClientId, ChanPid) when is_pid(ChanPid) ->
     case takeover_session(ClientId, ChanPid) of
-        {living, ConnMod, Session} ->
+        {living, ConnMod, ChanPid, Session} ->
             {ok, Session, {ConnMod, ChanPid}};
-        none ->
+        _ ->
             none
     end;
 takeover_session_begin(_ClientId, undefined) ->
@@ -368,13 +368,20 @@ do_takeover_begin(ClientId, ChanPid) when node(ChanPid) == node() ->
         ConnMod when is_atom(ConnMod) ->
             case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of
                 {ok, Session} ->
-                    {living, ConnMod, Session};
+                    {living, ConnMod, ChanPid, Session};
                 {error, Reason} ->
                     error(Reason)
             end
     end;
 do_takeover_begin(ClientId, ChanPid) ->
-    wrap_rpc(emqx_cm_proto_v2:takeover_session(ClientId, ChanPid)).
+    case wrap_rpc(emqx_cm_proto_v2:takeover_session(ClientId, ChanPid)) of
+        %% NOTE: v5.3.0
+        {living, ConnMod, Session} ->
+            {living, ConnMod, ChanPid, Session};
+        %% NOTE: other versions
+        Res ->
+            Res
+    end.
 
 %% @doc Discard all the sessions identified by the ClientId.
 -spec discard_session(emqx_types:clientid()) -> ok.

+ 3 - 1
apps/emqx/src/proto/emqx_cm_proto_v2.erl

@@ -65,8 +65,10 @@ get_chann_conn_mod(ClientId, ChanPid) ->
 
 -spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) ->
     none
-    | {expired | persistent, emqx_session:session()}
     | {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()}
+    %% NOTE: v5.3.0
+    | {living, _ConnMod :: atom(), emqx_session:session()}
+    | {expired | persistent, emqx_session:session()}
     | {badrpc, _}.
 takeover_session(ClientId, ChanPid) ->
     rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2).

+ 5 - 0
apps/emqx_auth/include/emqx_authz.hrl

@@ -163,3 +163,8 @@
 
 -define(DEFAULT_RULE_QOS, [0, 1, 2]).
 -define(DEFAULT_RULE_RETAIN, all).
+
+-define(BUILTIN_SOURCES, [
+    {client_info, emqx_authz_client_info},
+    {file, emqx_authz_file}
+]).

+ 3 - 1
apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl

@@ -26,6 +26,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx_hooks.hrl").
 -include_lib("stdlib/include/ms_transform.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(CONF_ROOT, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
 
@@ -446,7 +447,7 @@ handle_continue(initialize_authentication, #{init_done := true} = State) ->
     {noreply, State};
 handle_continue(initialize_authentication, #{providers := Providers} = State) ->
     InitDone = initialize_authentication(Providers),
-    {noreply, State#{init_done := InitDone}}.
+    {noreply, maybe_hook(State#{init_done := InitDone})}.
 
 handle_cast(Req, State) ->
     ?SLOG(error, #{msg => "unexpected_cast", cast => Req}),
@@ -495,6 +496,7 @@ do_initialize_authentication(Providers, Chains, _HasProviders = true) ->
         Chains
     ),
     ok = unhook_deny(),
+    ?tp(info, authn_chains_initialization_done, #{}),
     true.
 
 initialize_chain_authentication(_Providers, _ChainName, []) ->

+ 17 - 8
apps/emqx_auth/src/emqx_authz/emqx_authz.erl

@@ -88,8 +88,7 @@ init() ->
     emqx_conf:add_handler(?CONF_KEY_PATH, ?MODULE),
     emqx_conf:add_handler(?ROOT_KEY, ?MODULE),
     ok = emqx_hooks:put('client.authorize', {?MODULE, authorize_deny, []}, ?HP_AUTHZ),
-    ok = register_source(client_info, emqx_authz_client_info),
-    ok = register_source(file, emqx_authz_file),
+    ok = register_builtin_sources(),
     ok.
 
 register_source(Type, Module) ->
@@ -124,6 +123,14 @@ are_all_providers_registered() ->
             false
     end.
 
+register_builtin_sources() ->
+    lists:foreach(
+        fun({Type, Module}) ->
+            register_source(Type, Module)
+        end,
+        ?BUILTIN_SOURCES
+    ).
+
 configured_types() ->
     lists:map(
         fun(#{type := Type}) -> Type end,
@@ -186,8 +193,14 @@ pre_config_update(Path, Cmd, Sources) ->
         {error, Reason} -> {error, Reason};
         NSources -> {ok, NSources}
     catch
-        Error:Reason:Stack ->
+        throw:Reason ->
             ?SLOG(info, #{
+                msg => "error_in_pre_config_update",
+                reason => Reason
+            }),
+            {error, Reason};
+        Error:Reason:Stack ->
+            ?SLOG(warning, #{
                 msg => "error_in_pre_config_update",
                 exception => Error,
                 reason => Reason,
@@ -596,10 +609,6 @@ maybe_convert_sources(
 maybe_convert_sources(RawConf, _Fun) ->
     RawConf.
 
-% read_acl_file(#{<<"path">> := Path} = Source) ->
-%     {ok, Rules} = emqx_authz_file:read_file(Path),
-%     maps:remove(<<"path">>, Source#{<<"rules">> => Rules}).
-
 %%------------------------------------------------------------------------------
 %% Extended Features
 %%------------------------------------------------------------------------------
@@ -706,7 +715,7 @@ maybe_read_source_files_safe(Source0) ->
     catch
         Error:Reason:Stacktrace ->
             ?SLOG(error, #{
-                msg => "error_in_maybe_read_source_files",
+                msg => "error_when_reading_source_files",
                 exception => Error,
                 reason => Reason,
                 stacktrace => Stacktrace

+ 9 - 2
apps/emqx_auth/test/emqx_authn/emqx_authn_init_SUITE.erl

@@ -19,6 +19,7 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include_lib("emqx/include/asserts.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
@@ -48,9 +49,11 @@ init_per_testcase(_Case, Config) ->
             work_dir => ?config(priv_dir, Config)
         }
     ),
+    ok = snabbkaffe:start_trace(),
     [{apps, Apps} | Config].
 
 end_per_testcase(_Case, Config) ->
+    ok = snabbkaffe:stop(),
     _ = application:stop(emqx_auth),
     ok = emqx_cth_suite:stop(?config(apps, Config)),
     ok.
@@ -66,10 +69,14 @@ t_initialize(_Config) ->
         emqx_access_control:authenticate(?CLIENTINFO)
     ),
 
-    ok = emqx_authn_test_lib:register_fake_providers([{password_based, built_in_database}]),
+    ?assertWaitEvent(
+        ok = emqx_authn_test_lib:register_fake_providers([{password_based, built_in_database}]),
+        #{?snk_kind := authn_chains_initialization_done},
+        100
+    ),
 
     ?assertMatch(
-        {error, not_authorized},
+        {error, bad_username_or_password},
         emqx_access_control:authenticate(?CLIENTINFO)
     ),
 

+ 90 - 78
apps/emqx_auth/test/emqx_authz/emqx_authz_SUITE.erl

@@ -20,7 +20,6 @@
 
 -include("emqx_authz.hrl").
 -include_lib("emqx/include/emqx.hrl").
--include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("emqx/include/emqx_placeholder.hrl").
@@ -35,33 +34,18 @@ groups() ->
     [].
 
 init_per_suite(Config) ->
-    meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
-    meck:expect(emqx_resource, create_local, fun(_, _, _, _) -> {ok, meck_data} end),
-    meck:expect(emqx_resource, remove_local, fun(_) -> ok end),
-    meck:expect(
-        emqx_authz_file,
-        acl_conf_file,
-        fun() ->
-            emqx_common_test_helpers:deps_path(emqx_auth, "etc/acl.conf")
-        end
-    ),
     Apps = emqx_cth_suite:start(
         [
             emqx,
             {emqx_conf,
                 "authorization { cache { enable = false }, no_match = deny, sources = [] }"},
-            emqx_auth,
-            emqx_auth_http,
-            emqx_auth_mnesia,
-            emqx_auth_redis,
-            emqx_auth_postgresql,
-            emqx_auth_mysql,
-            emqx_auth_mongodb
+            emqx_auth
         ],
         #{
             work_dir => filename:join(?config(priv_dir, Config), ?MODULE)
         }
     ),
+    ok = emqx_authz_test_lib:register_fake_sources([http, redis, mongodb, mysql, postgresql]),
     [{suite_apps, Apps} | Config].
 
 end_per_suite(Config) ->
@@ -73,8 +57,8 @@ end_per_suite(Config) ->
             <<"sources">> => []
         }
     ),
+    ok = emqx_authz_test_lib:deregister_sources(),
     emqx_cth_suite:stop(?config(suite_apps, Config)),
-    meck:unload(emqx_resource),
     ok.
 
 init_per_testcase(TestCase, Config) when
@@ -102,7 +86,7 @@ end_per_testcase(_TestCase, _Config) ->
     emqx_common_test_helpers:call_janitor(),
     ok.
 
--define(SOURCE1, #{
+-define(SOURCE_HTTP, #{
     <<"type">> => <<"http">>,
     <<"enable">> => true,
     <<"url">> => <<"https://example.com:443/a/b?c=d">>,
@@ -111,7 +95,7 @@ end_per_testcase(_TestCase, _Config) ->
     <<"method">> => <<"get">>,
     <<"request_timeout">> => <<"5s">>
 }).
--define(SOURCE2, #{
+-define(SOURCE_MONGODB, #{
     <<"type">> => <<"mongodb">>,
     <<"enable">> => true,
     <<"mongo_type">> => <<"single">>,
@@ -123,7 +107,7 @@ end_per_testcase(_TestCase, _Config) ->
     <<"collection">> => <<"authz">>,
     <<"filter">> => #{<<"a">> => <<"b">>}
 }).
--define(SOURCE3, #{
+-define(SOURCE_MYSQL, #{
     <<"type">> => <<"mysql">>,
     <<"enable">> => true,
     <<"server">> => <<"127.0.0.1:27017">>,
@@ -135,7 +119,7 @@ end_per_testcase(_TestCase, _Config) ->
     <<"ssl">> => #{<<"enable">> => false},
     <<"query">> => <<"abcb">>
 }).
--define(SOURCE4, #{
+-define(SOURCE_POSTGRESQL, #{
     <<"type">> => <<"postgresql">>,
     <<"enable">> => true,
     <<"server">> => <<"127.0.0.1:27017">>,
@@ -147,7 +131,7 @@ end_per_testcase(_TestCase, _Config) ->
     <<"ssl">> => #{<<"enable">> => false},
     <<"query">> => <<"abcb">>
 }).
--define(SOURCE5, #{
+-define(SOURCE_REDIS, #{
     <<"type">> => <<"redis">>,
     <<"redis_type">> => <<"single">>,
     <<"enable">> => true,
@@ -160,22 +144,22 @@ end_per_testcase(_TestCase, _Config) ->
     <<"cmd">> => <<"HGETALL mqtt_authz:", ?PH_USERNAME/binary>>
 }).
 
--define(FILE_SOURCE(Rules), #{
+-define(SOURCE_FILE(Rules), #{
     <<"type">> => <<"file">>,
     <<"enable">> => true,
     <<"rules">> => Rules
 }).
 
--define(SOURCE6,
-    ?FILE_SOURCE(
+-define(SOURCE_FILE1,
+    ?SOURCE_FILE(
         <<
             "{allow,{username,\"^dashboard?\"},subscribe,[\"$SYS/#\"]}."
             "\n{allow,{ipaddr,\"127.0.0.1\"},all,[\"$SYS/#\",\"#\"]}."
         >>
     )
 ).
--define(SOURCE7,
-    ?FILE_SOURCE(
+-define(SOURCE_FILE2,
+    ?SOURCE_FILE(
         <<
             "{allow,{username,\"some_client\"},publish,[\"some_client/lwt\"]}.\n"
             "{deny, all}."
@@ -183,15 +167,6 @@ end_per_testcase(_TestCase, _Config) ->
     )
 ).
 
--define(BAD_FILE_SOURCE2, #{
-    <<"type">> => <<"file">>,
-    <<"enable">> => true,
-    <<"rules">> =>
-        <<
-            "{not_allow,{username,\"some_client\"},publish,[\"some_client/lwt\"]}."
-        >>
-}).
-
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -199,24 +174,23 @@ end_per_testcase(_TestCase, _Config) ->
 -define(UPDATE_ERROR(Err), {error, {pre_config_update, emqx_authz, Err}}).
 
 t_bad_file_source(_) ->
-    BadContent = ?FILE_SOURCE(<<"{allow,{username,\"bar\"}, publish, [\"test\"]}">>),
+    BadContent = ?SOURCE_FILE(<<"{allow,{username,\"bar\"}, publish, [\"test\"]}">>),
     BadContentErr = {bad_acl_file_content, {1, erl_parse, ["syntax error before: ", []]}},
-    BadRule = ?FILE_SOURCE(<<"{allow,{username,\"bar\"},publish}.">>),
+    BadRule = ?SOURCE_FILE(<<"{allow,{username,\"bar\"},publish}.">>),
     BadRuleErr = {invalid_authorization_rule, {allow, {username, "bar"}, publish}},
-    BadPermission = ?FILE_SOURCE(<<"{not_allow,{username,\"bar\"},publish,[\"test\"]}.">>),
+    BadPermission = ?SOURCE_FILE(<<"{not_allow,{username,\"bar\"},publish,[\"test\"]}.">>),
     BadPermissionErr = {invalid_authorization_permission, not_allow},
-    BadAction = ?FILE_SOURCE(<<"{allow,{username,\"bar\"},pubsub,[\"test\"]}.">>),
+    BadAction = ?SOURCE_FILE(<<"{allow,{username,\"bar\"},pubsub,[\"test\"]}.">>),
     BadActionErr = {invalid_authorization_action, pubsub},
     lists:foreach(
         fun({Source, Error}) ->
             File = emqx_authz_file:acl_conf_file(),
-            {ok, Bin1} = file:read_file(File),
+            ?assertEqual({error, enoent}, file:read_file(File)),
             ?assertEqual(?UPDATE_ERROR(Error), emqx_authz:update(?CMD_REPLACE, [Source])),
             ?assertEqual(?UPDATE_ERROR(Error), emqx_authz:update(?CMD_PREPEND, Source)),
             ?assertEqual(?UPDATE_ERROR(Error), emqx_authz:update(?CMD_APPEND, Source)),
-            %% Check file content not changed if update failed
-            {ok, Bin2} = file:read_file(File),
-            ?assertEqual(Bin1, Bin2)
+            %% Check file is not created if update failed;
+            ?assertEqual({error, enoent}, file:read_file(File))
         end,
         [
             {BadContent, BadContentErr},
@@ -230,14 +204,32 @@ t_bad_file_source(_) ->
         emqx_conf:get([authorization, sources], [])
     ).
 
+t_good_file_source(_) ->
+    RuleBin = <<"{allow,{username,\"bar\"}, publish, [\"test\"]}.">>,
+    GoodFileSource = ?SOURCE_FILE(RuleBin),
+    File = emqx_authz_file:acl_conf_file(),
+    lists:foreach(
+        fun({Command, Argument}) ->
+            _ = file:delete(File),
+            ?assertMatch({ok, _}, emqx_authz:update(Command, Argument)),
+            ?assertEqual({ok, RuleBin}, file:read_file(File)),
+            {ok, _} = emqx_authz:update(?CMD_REPLACE, [])
+        end,
+        [
+            {?CMD_REPLACE, [GoodFileSource]},
+            {?CMD_PREPEND, GoodFileSource},
+            {?CMD_APPEND, GoodFileSource}
+        ]
+    ).
+
 t_update_source(_) ->
     %% replace all
-    {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE3]),
-    {ok, _} = emqx_authz:update(?CMD_PREPEND, ?SOURCE2),
-    {ok, _} = emqx_authz:update(?CMD_PREPEND, ?SOURCE1),
-    {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE4),
-    {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE5),
-    {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE6),
+    {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_MYSQL]),
+    {ok, _} = emqx_authz:update(?CMD_PREPEND, ?SOURCE_MONGODB),
+    {ok, _} = emqx_authz:update(?CMD_PREPEND, ?SOURCE_HTTP),
+    {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE_POSTGRESQL),
+    {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE_REDIS),
+    {ok, _} = emqx_authz:update(?CMD_APPEND, ?SOURCE_FILE1),
 
     ?assertMatch(
         [
@@ -251,19 +243,23 @@ t_update_source(_) ->
         emqx_conf:get([authorization, sources], [])
     ),
 
-    {ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE1#{<<"enable">> := true}),
-    {ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE2#{<<"enable">> := true}),
-    {ok, _} = emqx_authz:update({?CMD_REPLACE, mysql}, ?SOURCE3#{<<"enable">> := true}),
-    {ok, _} = emqx_authz:update({?CMD_REPLACE, postgresql}, ?SOURCE4#{<<"enable">> := true}),
-    {ok, _} = emqx_authz:update({?CMD_REPLACE, redis}, ?SOURCE5#{<<"enable">> := true}),
-    {ok, _} = emqx_authz:update({?CMD_REPLACE, file}, ?SOURCE6#{<<"enable">> := true}),
-
-    {ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE1#{<<"enable">> := false}),
-    {ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE2#{<<"enable">> := false}),
-    {ok, _} = emqx_authz:update({?CMD_REPLACE, mysql}, ?SOURCE3#{<<"enable">> := false}),
-    {ok, _} = emqx_authz:update({?CMD_REPLACE, postgresql}, ?SOURCE4#{<<"enable">> := false}),
-    {ok, _} = emqx_authz:update({?CMD_REPLACE, redis}, ?SOURCE5#{<<"enable">> := false}),
-    {ok, _} = emqx_authz:update({?CMD_REPLACE, file}, ?SOURCE6#{<<"enable">> := false}),
+    {ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE_HTTP#{<<"enable">> := true}),
+    {ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE_MONGODB#{<<"enable">> := true}),
+    {ok, _} = emqx_authz:update({?CMD_REPLACE, mysql}, ?SOURCE_MYSQL#{<<"enable">> := true}),
+    {ok, _} = emqx_authz:update({?CMD_REPLACE, postgresql}, ?SOURCE_POSTGRESQL#{
+        <<"enable">> := true
+    }),
+    {ok, _} = emqx_authz:update({?CMD_REPLACE, redis}, ?SOURCE_REDIS#{<<"enable">> := true}),
+    {ok, _} = emqx_authz:update({?CMD_REPLACE, file}, ?SOURCE_FILE1#{<<"enable">> := true}),
+
+    {ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE_HTTP#{<<"enable">> := false}),
+    {ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE_MONGODB#{<<"enable">> := false}),
+    {ok, _} = emqx_authz:update({?CMD_REPLACE, mysql}, ?SOURCE_MYSQL#{<<"enable">> := false}),
+    {ok, _} = emqx_authz:update({?CMD_REPLACE, postgresql}, ?SOURCE_POSTGRESQL#{
+        <<"enable">> := false
+    }),
+    {ok, _} = emqx_authz:update({?CMD_REPLACE, redis}, ?SOURCE_REDIS#{<<"enable">> := false}),
+    {ok, _} = emqx_authz:update({?CMD_REPLACE, file}, ?SOURCE_FILE1#{<<"enable">> := false}),
 
     ?assertMatch(
         [
@@ -295,7 +291,12 @@ t_replace_all(_) ->
     Conf = emqx:get_raw_config(RootKey),
     emqx_authz_utils:update_config(RootKey, Conf#{
         <<"sources">> => [
-            ?SOURCE6, ?SOURCE5, ?SOURCE4, ?SOURCE3, ?SOURCE2, ?SOURCE1
+            ?SOURCE_FILE1,
+            ?SOURCE_REDIS,
+            ?SOURCE_POSTGRESQL,
+            ?SOURCE_MYSQL,
+            ?SOURCE_MONGODB,
+            ?SOURCE_HTTP
         ]
     }),
     %% config
@@ -335,7 +336,7 @@ t_replace_all(_) ->
         {ok, _},
         emqx_authz_utils:update_config(
             RootKey,
-            Conf#{<<"sources">> => [?SOURCE1#{<<"enable">> => false}]}
+            Conf#{<<"sources">> => [?SOURCE_HTTP#{<<"enable">> => false}]}
         )
     ),
     %% hooks status
@@ -351,7 +352,7 @@ t_replace_all(_) ->
     ok.
 
 t_delete_source(_) ->
-    {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE1]),
+    {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_HTTP]),
 
     ?assertMatch([#{type := http, enable := true}], emqx_conf:get([authorization, sources], [])),
 
@@ -363,12 +364,12 @@ t_move_source(_) ->
     {ok, _} = emqx_authz:update(
         ?CMD_REPLACE,
         [
-            ?SOURCE1,
-            ?SOURCE2,
-            ?SOURCE3,
-            ?SOURCE4,
-            ?SOURCE5,
-            ?SOURCE6
+            ?SOURCE_HTTP,
+            ?SOURCE_MONGODB,
+            ?SOURCE_MYSQL,
+            ?SOURCE_POSTGRESQL,
+            ?SOURCE_REDIS,
+            ?SOURCE_FILE1
         ]
     ),
     ?assertMatch(
@@ -437,15 +438,26 @@ t_move_source(_) ->
 
     ok.
 
+t_pre_config_update_crash(_) ->
+    ok = meck:new(emqx_authz_fake_source, [non_strict, passthrough, no_history]),
+    ok = meck:expect(emqx_authz_fake_source, write_files, fun(_) -> meck:exception(error, oops) end),
+    ?assertEqual(
+        {error, {pre_config_update, emqx_authz, oops}},
+        emqx_authz:update(?CMD_APPEND, ?SOURCE_HTTP)
+    ),
+    ok = meck:unload(emqx_authz_fake_source).
+
 t_get_enabled_authzs_none_enabled(_Config) ->
     ?assertEqual([], emqx_authz:get_enabled_authzs()).
 
 t_get_enabled_authzs_some_enabled(_Config) ->
-    {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE4, ?SOURCE5#{<<"enable">> := false}]),
+    {ok, _} = emqx_authz:update(?CMD_REPLACE, [
+        ?SOURCE_POSTGRESQL, ?SOURCE_REDIS#{<<"enable">> := false}
+    ]),
     ?assertEqual([postgresql], emqx_authz:get_enabled_authzs()).
 
 t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) ->
-    {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]),
+    {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_FILE2]),
     {ok, C} = emqtt:start_link([
         {username, <<"some_client">>},
         {will_topic, <<"some_client/lwt">>},
@@ -473,7 +485,7 @@ t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) ->
     ok.
 
 t_publish_deny_disconnect_publishes_last_will_testament(_Config) ->
-    {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]),
+    {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_FILE2]),
     {ok, C} = emqtt:start_link([
         {username, <<"some_client">>},
         {will_topic, <<"some_client/lwt">>},
@@ -530,7 +542,7 @@ t_publish_last_will_testament_denied_topic(_Config) ->
 %% and then gets banned and kicked out while connected.  Should not
 %% publish LWT.
 t_publish_last_will_testament_banned_client_connecting(_Config) ->
-    {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]),
+    {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE_FILE2]),
     Username = <<"some_client">>,
     ClientId = <<"some_clientid">>,
     LWTPayload = <<"should not be published">>,

+ 21 - 22
apps/emqx_auth/test/emqx_authz/emqx_authz_api_sources_SUITE.erl

@@ -29,7 +29,7 @@
 -define(PGSQL_HOST, "pgsql").
 -define(REDIS_SINGLE_HOST, "redis").
 
--define(SOURCE1, #{
+-define(SOURCE_REDIS1, #{
     <<"type">> => <<"http">>,
     <<"enable">> => true,
     <<"url">> => <<"https://fake.com:443/acl?username=", ?PH_USERNAME/binary>>,
@@ -38,7 +38,7 @@
     <<"method">> => <<"get">>,
     <<"request_timeout">> => <<"5s">>
 }).
--define(SOURCE2, #{
+-define(SOURCE_MONGODB, #{
     <<"type">> => <<"mongodb">>,
     <<"enable">> => true,
     <<"mongo_type">> => <<"single">>,
@@ -50,7 +50,7 @@
     <<"collection">> => <<"fake">>,
     <<"filter">> => #{<<"a">> => <<"b">>}
 }).
--define(SOURCE3, #{
+-define(SOURCE_MYSQL, #{
     <<"type">> => <<"mysql">>,
     <<"enable">> => true,
     <<"server">> => <<?MYSQL_HOST>>,
@@ -62,7 +62,7 @@
     <<"ssl">> => #{<<"enable">> => false},
     <<"query">> => <<"abcb">>
 }).
--define(SOURCE4, #{
+-define(SOURCE_POSTGRESQL, #{
     <<"type">> => <<"postgresql">>,
     <<"enable">> => true,
     <<"server">> => <<?PGSQL_HOST>>,
@@ -74,7 +74,7 @@
     <<"ssl">> => #{<<"enable">> => false},
     <<"query">> => <<"abcb">>
 }).
--define(SOURCE5, #{
+-define(SOURCE_REDIS2, #{
     <<"type">> => <<"redis">>,
     <<"enable">> => true,
     <<"servers">> => <<?REDIS_SINGLE_HOST, ",127.0.0.1:6380">>,
@@ -85,7 +85,7 @@
     <<"ssl">> => #{<<"enable">> => false},
     <<"cmd">> => <<"HGETALL mqtt_authz:", ?PH_USERNAME/binary>>
 }).
--define(SOURCE6, #{
+-define(SOURCE_FILE, #{
     <<"type">> => <<"file">>,
     <<"enable">> => true,
     <<"rules">> =>
@@ -120,12 +120,6 @@ init_per_suite(Config) ->
             {emqx_conf,
                 "authorization { cache { enable = false }, no_match = deny, sources = [] }"},
             emqx_auth,
-            emqx_auth_http,
-            emqx_auth_mnesia,
-            emqx_auth_redis,
-            emqx_auth_postgresql,
-            emqx_auth_mysql,
-            emqx_auth_mongodb,
             emqx_management,
             {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
         ],
@@ -133,6 +127,7 @@ init_per_suite(Config) ->
             work_dir => filename:join(?config(priv_dir, Config), ?MODULE)
         }
     ),
+    ok = emqx_authz_test_lib:register_fake_sources([http, mongodb, mysql, postgresql, redis]),
     _ = emqx_common_test_http:create_default_app(),
     [{suite_apps, Apps} | Config].
 
@@ -192,9 +187,11 @@ t_api(_) ->
         begin
             {ok, 204, _} = request(post, uri(["authorization", "sources"]), Source)
         end
-     || Source <- lists:reverse([?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5, ?SOURCE6])
+     || Source <- lists:reverse([
+            ?SOURCE_MONGODB, ?SOURCE_MYSQL, ?SOURCE_POSTGRESQL, ?SOURCE_REDIS2, ?SOURCE_FILE
+        ])
     ],
-    {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE1),
+    {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE_REDIS1),
 
     {ok, 200, Result2} = request(get, uri(["authorization", "sources"]), []),
     Sources = get_sources(Result2),
@@ -214,7 +211,7 @@ t_api(_) ->
     {ok, 204, _} = request(
         put,
         uri(["authorization", "sources", "http"]),
-        ?SOURCE1#{<<"enable">> := false}
+        ?SOURCE_REDIS1#{<<"enable">> := false}
     ),
     {ok, 200, Result3} = request(get, uri(["authorization", "sources", "http"]), []),
     ?assertMatch(
@@ -238,7 +235,7 @@ t_api(_) ->
     {ok, 204, _} = request(
         put,
         uri(["authorization", "sources", "mongodb"]),
-        ?SOURCE2#{
+        ?SOURCE_MONGODB#{
             <<"ssl">> => #{
                 <<"enable">> => <<"true">>,
                 <<"cacertfile">> => Cacertfile,
@@ -279,7 +276,7 @@ t_api(_) ->
     {ok, 204, _} = request(
         put,
         uri(["authorization", "sources", "mongodb"]),
-        ?SOURCE2#{
+        ?SOURCE_MONGODB#{
             <<"ssl">> => #{
                 <<"enable">> => <<"true">>,
                 <<"cacertfile">> => Cacert,
@@ -329,19 +326,19 @@ t_api(_) ->
     {ok, 204, _} = request(
         put,
         uri(["authorization", "sources", "mysql"]),
-        ?SOURCE3#{<<"server">> := <<"192.168.1.100:3306">>}
+        ?SOURCE_MYSQL#{<<"server">> := <<"192.168.1.100:3306">>}
     ),
 
     {ok, 204, _} = request(
         put,
         uri(["authorization", "sources", "postgresql"]),
-        ?SOURCE4#{<<"server">> := <<"fake">>}
+        ?SOURCE_POSTGRESQL#{<<"server">> := <<"fake">>}
     ),
 
     {ok, 204, _} = request(
         put,
         uri(["authorization", "sources", "redis"]),
-        ?SOURCE5#{
+        ?SOURCE_REDIS2#{
             <<"servers">> := [
                 <<"192.168.1.100:6379">>,
                 <<"192.168.1.100:6380">>
@@ -413,7 +410,7 @@ t_api(_) ->
         #{<<"type">> => <<"built_in_database">>, <<"enable">> => false}
     ),
 
-    {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE6),
+    {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE_FILE),
 
     {ok, Client} = emqtt:start_link(
         [
@@ -505,7 +502,9 @@ t_api(_) ->
     ok.
 
 t_source_move(_) ->
-    {ok, _} = emqx_authz:update(replace, [?SOURCE1, ?SOURCE2, ?SOURCE3, ?SOURCE4, ?SOURCE5]),
+    {ok, _} = emqx_authz:update(replace, [
+        ?SOURCE_REDIS1, ?SOURCE_MONGODB, ?SOURCE_MYSQL, ?SOURCE_POSTGRESQL, ?SOURCE_REDIS2
+    ]),
     ?assertMatch(
         [
             #{type := http},

+ 46 - 0
apps/emqx_auth/test/emqx_authz/emqx_authz_fake_source.erl

@@ -0,0 +1,46 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_authz_fake_source).
+
+-behaviour(emqx_authz_source).
+
+%% APIs
+-export([
+    description/0,
+    create/1,
+    update/1,
+    destroy/1,
+    authorize/4
+]).
+
+%%--------------------------------------------------------------------
+%% emqx_authz callbacks
+%%--------------------------------------------------------------------
+
+description() ->
+    "Fake AuthZ".
+
+create(Source) ->
+    Source.
+
+update(Source) ->
+    Source.
+
+destroy(_Source) -> ok.
+
+authorize(_Client, _PubSub, _Topic, _Source) ->
+    nomatch.

+ 18 - 0
apps/emqx_auth/test/emqx_authz/emqx_authz_test_lib.erl

@@ -51,6 +51,24 @@ setup_config(BaseConfig, SpecialParams) ->
         {error, Reason} -> {error, Reason}
     end.
 
+register_fake_sources(SourceTypes) ->
+    lists:foreach(
+        fun(Type) ->
+            emqx_authz_source_registry:register(Type, emqx_authz_fake_source)
+        end,
+        SourceTypes
+    ).
+
+deregister_sources() ->
+    {BuiltInTypes, _} = lists:unzip(?BUILTIN_SOURCES),
+    SourceTypes = emqx_authz_source_registry:get(),
+    lists:foreach(
+        fun(Type) ->
+            emqx_authz_source_registry:register(Type, emqx_authz_fake_source)
+        end,
+        SourceTypes -- BuiltInTypes
+    ).
+
 %%--------------------------------------------------------------------
 %% Table-based test helpers
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_cassandra, [
     {description, "EMQX Enterprise Cassandra Bridge"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 1
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl

@@ -332,7 +332,7 @@ on_get_status(_InstId, #{pool_name := PoolName} = State) ->
     end.
 
 do_get_status(Conn) ->
-    ok == element(1, ecql:query(Conn, "SELECT count(1) AS T FROM system.local")).
+    ok == element(1, ecql:query(Conn, "SELECT cluster_name FROM system.local")).
 
 do_check_prepares(#{prepare_cql := Prepares}) when is_map(Prepares) ->
     ok;

+ 1 - 1
apps/emqx_bridge_http/src/emqx_bridge_http.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_http, [
     {description, "EMQX HTTP Bridge and Connector Application"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, []},
     {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
     {env, []},

+ 9 - 4
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -57,6 +57,8 @@
 -define(DEFAULT_PIPELINE_SIZE, 100).
 -define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000).
 
+-define(READACT_REQUEST_NOTE, "the request body is redacted due to security reasons").
+
 %%=====================================================================
 %% Hocon schema
 
@@ -303,7 +305,8 @@ on_query(
         "QUERY",
         "http_connector_received",
         #{
-            request => redact(Request),
+            request => redact_request(Request),
+            note => ?READACT_REQUEST_NOTE,
             connector => InstId,
             state => redact(State)
         }
@@ -329,7 +332,7 @@ on_query(
         {error, #{status_code := StatusCode}} ->
             ?SLOG(error, #{
                 msg => "http_connector_do_request_received_error_response",
-                note => "the body will be redacted due to security reasons",
+                note => ?READACT_REQUEST_NOTE,
                 request => redact_request(NRequest),
                 connector => InstId,
                 status_code => StatusCode
@@ -338,7 +341,8 @@ on_query(
         {error, Reason} ->
             ?SLOG(error, #{
                 msg => "http_connector_do_request_failed",
-                request => redact(NRequest),
+                note => ?READACT_REQUEST_NOTE,
+                request => redact_request(NRequest),
                 reason => Reason,
                 connector => InstId
             }),
@@ -379,7 +383,8 @@ on_query_async(
         "QUERY_ASYNC",
         "http_connector_received",
         #{
-            request => redact(Request),
+            request => redact_request(Request),
+            note => ?READACT_REQUEST_NOTE,
             connector => InstId,
             state => redact(State)
         }

+ 2 - 2
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -32,9 +32,9 @@
 -define(kafka_producers, kafka_producers).
 
 query_mode(#{kafka := #{query_mode := sync}}) ->
-    simple_sync;
+    simple_sync_internal_buffer;
 query_mode(_) ->
-    simple_async.
+    simple_async_internal_buffer.
 
 callback_mode() -> async_if_possible.
 

+ 2 - 2
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl

@@ -133,7 +133,7 @@ t_query_mode(CtConfig) ->
         end,
         fun(Trace) ->
             %% We should have a sync Snabbkaffe trace
-            ?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_sync_query, Trace))
+            ?assertMatch([_], ?of_kind(simple_sync_internal_buffer_query, Trace))
         end
     ),
     ?check_trace(
@@ -141,7 +141,7 @@ t_query_mode(CtConfig) ->
             publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"})
         end,
         fun(Trace) ->
-            %% We should have a sync Snabbkaffe trace
+            %% We should have an async Snabbkaffe trace
             ?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace))
         end
     ),

+ 1 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_pulsar, [
     {description, "EMQX Pulsar Bridge"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, []},
     {applications, [
         kernel,

+ 1 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl

@@ -73,7 +73,7 @@
 callback_mode() -> async_if_possible.
 
 query_mode(_Config) ->
-    simple_async.
+    simple_async_internal_buffer.
 
 -spec on_start(resource_id(), config()) -> {ok, state()}.
 on_start(InstanceId, Config) ->

+ 4 - 1
apps/emqx_gcp_device/src/emqx_gcp_device.erl

@@ -8,7 +8,10 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("stdlib/include/ms_transform.hrl").
 
--define(AUTHN_SHARD, ?MODULE).
+%% NOTE
+%% We share the shard with `emqx_auth_mnesia` to ensure backward compatibility
+%% with EMQX 5.2.x and earlier.
+-define(AUTHN_SHARD, emqx_authn_shard).
 
 %% Management
 -export([put_device/1, get_device/1, remove_device/1]).

+ 10 - 4
apps/emqx_resource/include/emqx_resource.hrl

@@ -22,11 +22,18 @@
 -type resource_state() :: term().
 -type resource_status() :: connected | disconnected | connecting | stopped.
 -type callback_mode() :: always_sync | async_if_possible.
--type query_mode() :: simple_sync | simple_async | sync | async | no_queries.
+-type query_mode() ::
+    simple_sync
+    | simple_async
+    | simple_sync_internal_buffer
+    | simple_async_internal_buffer
+    | sync
+    | async
+    | no_queries.
 -type result() :: term().
 -type reply_fun() ::
-    {fun((result(), Args :: term()) -> any()), Args :: term()}
-    | {fun((result(), Args :: term()) -> any()), Args :: term(), reply_context()}
+    {fun((...) -> any()), Args :: [term()]}
+    | {fun((...) -> any()), Args :: [term()], reply_context()}
     | undefined.
 -type reply_context() :: #{reply_dropped => boolean()}.
 -type query_opts() :: #{
@@ -36,7 +43,6 @@
     expire_at => infinity | integer(),
     async_reply_fun => reply_fun(),
     simple_query => boolean(),
-    is_buffer_supported => boolean(),
     reply_to => reply_fun()
 }.
 -type resource_data() :: #{

+ 14 - 0
apps/emqx_resource/src/emqx_resource.erl

@@ -311,6 +311,20 @@ query(ResId, Request, Opts) ->
                     %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
                     %% so the buffer worker does not need to lookup the cache again
                     emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
+                {simple_async_internal_buffer, _} ->
+                    %% This is for bridges/connectors that have internal buffering, such
+                    %% as Kafka and Pulsar producers.
+                    %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
+                    %% so the buffer worker does not need to lookup the cache again
+                    emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
+                {simple_sync_internal_buffer, _} ->
+                    %% This is for bridges/connectors that have internal buffering, such
+                    %% as Kafka and Pulsar producers.
+                    %% TODO(5.1.1): pass Resource instead of ResId to simple APIs
+                    %% so the buffer worker does not need to lookup the cache again
+                    emqx_resource_buffer_worker:simple_sync_internal_buffer_query(
+                        ResId, Request, Opts
+                    );
                 {sync, _} ->
                     emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
                 {async, _} ->

+ 48 - 3
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -39,7 +39,8 @@
 -export([
     simple_sync_query/2,
     simple_sync_query/3,
-    simple_async_query/3
+    simple_async_query/3,
+    simple_sync_internal_buffer_query/3
 ]).
 
 -export([
@@ -53,7 +54,9 @@
 
 -export([queue_item_marshaller/1, estimate_size/1]).
 
--export([handle_async_reply/2, handle_async_batch_reply/2, reply_call/2]).
+-export([
+    handle_async_reply/2, handle_async_batch_reply/2, reply_call/2, reply_call_internal_buffer/3
+]).
 
 -export([clear_disk_queue_dir/2]).
 
@@ -169,6 +172,42 @@ simple_async_query(Id, Request, QueryOpts0) ->
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     Result.
 
+%% This is a hack to handle cases where the underlying connector has internal buffering
+%% (e.g.: Kafka and Pulsar producers).  Since the message may be inernally retried at a
+%% later time, we can't bump metrics immediatelly if the return value is not a success
+%% (e.g.: if the call timed out, but the message was enqueued nevertheless).
+-spec simple_sync_internal_buffer_query(id(), request(), query_opts()) -> term().
+simple_sync_internal_buffer_query(Id, Request, QueryOpts0) ->
+    ?tp(simple_sync_internal_buffer_query, #{id => Id, request => Request, query_opts => QueryOpts0}),
+    ReplyAlias = alias([reply]),
+    try
+        MaybeReplyTo = maps:get(reply_to, QueryOpts0, undefined),
+        QueryOpts1 = QueryOpts0#{
+            reply_to => {fun ?MODULE:reply_call_internal_buffer/3, [ReplyAlias, MaybeReplyTo]}
+        },
+        QueryOpts = #{timeout := Timeout} = maps:merge(simple_query_opts(), QueryOpts1),
+        case simple_async_query(Id, Request, QueryOpts) of
+            {error, _} = Error ->
+                Error;
+            {async_return, {error, _} = Error} ->
+                Error;
+            {async_return, {ok, _Pid}} ->
+                receive
+                    {ReplyAlias, Response} ->
+                        Response
+                after Timeout ->
+                    _ = unalias(ReplyAlias),
+                    receive
+                        {ReplyAlias, Response} ->
+                            Response
+                    after 0 -> {error, timeout}
+                    end
+                end
+        end
+    after
+        _ = unalias(ReplyAlias)
+    end.
+
 simple_query_opts() ->
     ensure_expire_at(#{simple_query => true, timeout => infinity}).
 
@@ -1049,7 +1088,7 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
     end.
 
 do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
-    ResQM =:= simple_async; ResQM =:= simple_sync
+    ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer
 ->
     %% The connector supports buffer, send even in disconnected state
     #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
@@ -1908,6 +1947,12 @@ reply_call(Alias, Response) ->
     erlang:send(Alias, {Alias, Response}),
     ok.
 
+%% Used by `simple_sync_internal_buffer_query' to reply and chain existing `reply_to'
+%% callbacks.
+reply_call_internal_buffer(ReplyAlias, MaybeReplyTo, Response) ->
+    ?MODULE:reply_call(ReplyAlias, Response),
+    do_reply_caller(MaybeReplyTo, Response).
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 adjust_batch_time_test_() ->

+ 2 - 2
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -147,9 +147,9 @@ create(ResId, Group, ResourceType, Config, Opts) ->
     QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts),
     case QueryMode of
         %% the resource has built-in buffer, so there is no need for resource workers
-        simple_sync ->
+        simple_sync_internal_buffer ->
             ok;
-        simple_async ->
+        simple_async_internal_buffer ->
             ok;
         %% The resource is a consumer resource, so there is no need for resource workers
         no_queries ->

+ 2 - 0
changes/ce/fix-11747.en.md

@@ -0,0 +1,2 @@
+Update QUIC stack to msquic 2.2.3.
+

+ 2 - 0
changes/ce/fix-11747.zh.md

@@ -0,0 +1,2 @@
+更新 QUIC 栈至 msquic 2.2.3
+

+ 1 - 0
changes/ee/fix-11724.en.md

@@ -0,0 +1 @@
+Fixed a metrics issue where messages sent to Kafka would count as failed even when they were successfully sent late due to its internal buffering.

+ 1 - 0
changes/ee/fix-11733.en.md

@@ -0,0 +1 @@
+Resolved an incompatibility issue that led to crashes during session takeover / channel eviction when the session was residing on a remote node running EMQX v5.2.x or earlier.

+ 1 - 0
changes/ee/fix-11750.en.md

@@ -0,0 +1 @@
+Eliminated logging and tracing of HTTP request bodies in HTTP authentification and HTTP bridges.

+ 1 - 0
changes/ee/fix-11760.en.md

@@ -0,0 +1 @@
+Simplified the CQL query employed for Cassandra bridge health check that was apparently the source of warnings in Cassandra server logs.

+ 1 - 1
mix.exs

@@ -821,7 +821,7 @@ defmodule EMQXUmbrella.MixProject do
   defp quicer_dep() do
     if enable_quicer?(),
       # in conflict with emqx and emqtt
-      do: [{:quicer, github: "emqx/quic", tag: "0.0.201", override: true}],
+      do: [{:quicer, github: "emqx/quic", tag: "0.0.202", override: true}],
       else: []
   end
 

+ 1 - 1
rebar.config.erl

@@ -39,7 +39,7 @@ bcrypt() ->
     {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.1"}}}.
 
 quicer() ->
-    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.201"}}}.
+    {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.202"}}}.
 
 jq() ->
     {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.10"}}}.