Procházet zdrojové kódy

Merge remote-tracking branch 'upstream/release-57' into 20240731-sync-release-57

Ivan Dyachkov před 1 rokem
rodič
revize
74cef7937d
44 změnil soubory, kde provedl 888 přidání a 319 odebrání
  1. 6 2
      apps/emqx/include/logger.hrl
  2. 56 16
      apps/emqx/src/emqx_log_throttler.erl
  3. 65 13
      apps/emqx/test/emqx_log_throttler_SUITE.erl
  4. 2 0
      apps/emqx_auth_http/include/emqx_auth_http.hrl
  5. 1 1
      apps/emqx_auth_http/src/emqx_auth_http_app.erl
  6. 35 32
      apps/emqx_auth_http/src/emqx_authn_http.erl
  7. 23 41
      apps/emqx_auth_http/src/emqx_authn_scram_http.erl
  8. 8 8
      apps/emqx_auth_http/src/emqx_authn_scram_http_schema.erl
  9. 139 68
      apps/emqx_auth_http/test/emqx_authn_scram_http_SUITE.erl
  10. 1 1
      apps/emqx_auth_http/test/emqx_authn_scram_http_test_server.erl
  11. 3 1
      apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl
  12. 1 1
      apps/emqx_bridge_azure_event_hub/mix.exs
  13. 1 1
      apps/emqx_bridge_azure_event_hub/rebar.config
  14. 27 8
      apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl
  15. 1 1
      apps/emqx_bridge_confluent/mix.exs
  16. 1 1
      apps/emqx_bridge_confluent/rebar.config
  17. 27 8
      apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl
  18. 1 1
      apps/emqx_bridge_kafka/mix.exs
  19. 1 1
      apps/emqx_bridge_kafka/rebar.config
  20. 30 5
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl
  21. 114 51
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
  22. 12 1
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl
  23. 3 3
      apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl
  24. 162 6
      apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl
  25. 2 1
      apps/emqx_conf/src/emqx_conf_schema.erl
  26. 1 1
      apps/emqx_conf/src/emqx_conf_schema_inject.erl
  27. 1 1
      apps/emqx_gateway/src/emqx_gateway_api_authn.erl
  28. 39 2
      apps/emqx_plugins/src/emqx_plugins.erl
  29. 54 24
      apps/emqx_resource/src/emqx_resource_buffer_worker.erl
  30. 5 3
      apps/emqx_rule_engine/src/emqx_rule_sqltester.erl
  31. 33 0
      apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl
  32. 5 7
      apps/emqx_utils/src/emqx_utils_scram.erl
  33. 1 1
      apps/emqx_utils/src/emqx_variform_bif.erl
  34. 1 1
      apps/emqx_utils/test/emqx_variform_bif_tests.erl
  35. 4 2
      changes/ce/feat-13507.en.md
  36. 1 0
      changes/ce/feat-13528.en.md
  37. 6 0
      changes/ce/feat-13548.en.md
  38. 1 0
      changes/ce/fix-13527.en.md
  39. 5 0
      changes/ee/feat-13452.en.md
  40. 4 0
      changes/ee/feat-13504.en.md
  41. 1 1
      mix.exs
  42. 1 1
      rel/i18n/emqx_bridge_azure_event_hub.hocon
  43. 2 2
      rel/i18n/emqx_bridge_confluent_producer.hocon
  44. 1 1
      rel/i18n/emqx_bridge_kafka.hocon

+ 6 - 2
apps/emqx/include/logger.hrl

@@ -41,16 +41,20 @@
 ).
 
 %% NOTE: do not forget to use atom for msg and add every used msg to
-%% the default value of `log.thorttling.msgs` list.
+%% the default value of `log.throttling.msgs` list.
 -define(SLOG_THROTTLE(Level, Data),
     ?SLOG_THROTTLE(Level, Data, #{})
 ).
 
 -define(SLOG_THROTTLE(Level, Data, Meta),
+    ?SLOG_THROTTLE(Level, undefined, Data, Meta)
+).
+
+-define(SLOG_THROTTLE(Level, UniqueKey, Data, Meta),
     case logger:allow(Level, ?MODULE) of
         true ->
             (fun(#{msg := __Msg} = __Data) ->
-                case emqx_log_throttler:allow(__Msg) of
+                case emqx_log_throttler:allow(__Msg, UniqueKey) of
                     true ->
                         logger:log(Level, __Data, Meta);
                     false ->

+ 56 - 16
apps/emqx/src/emqx_log_throttler.erl

@@ -25,7 +25,7 @@
 -export([start_link/0]).
 
 %% throttler API
--export([allow/1]).
+-export([allow/2]).
 
 %% gen_server callbacks
 -export([
@@ -40,23 +40,29 @@
 -define(SEQ_ID(Msg), {?MODULE, Msg}).
 -define(NEW_SEQ, atomics:new(1, [{signed, false}])).
 -define(GET_SEQ(Msg), persistent_term:get(?SEQ_ID(Msg), undefined)).
+-define(ERASE_SEQ(Msg), persistent_term:erase(?SEQ_ID(Msg))).
 -define(RESET_SEQ(SeqRef), atomics:put(SeqRef, 1, 0)).
 -define(INC_SEQ(SeqRef), atomics:add(SeqRef, 1, 1)).
 -define(GET_DROPPED(SeqRef), atomics:get(SeqRef, 1) - 1).
 -define(IS_ALLOWED(SeqRef), atomics:add_get(SeqRef, 1, 1) =:= 1).
 
--define(NEW_THROTTLE(Msg, SeqRef), persistent_term:put(?SEQ_ID(Msg), SeqRef)).
-
 -define(MSGS_LIST, emqx:get_config([log, throttling, msgs], [])).
 -define(TIME_WINDOW_MS, timer:seconds(emqx:get_config([log, throttling, time_window], 60))).
 
--spec allow(atom()) -> boolean().
-allow(Msg) when is_atom(Msg) ->
+%% @doc Check if a throttled log message is allowed to pass down to the logger this time.
+%% The Msg has to be an atom, and the second argument `UniqueKey' should be `undefined'
+%% for predefined message IDs.
+%% For relatively static resources created from configurations such as data integration
+%% resource IDs `UniqueKey' should be of `binary()' type.
+-spec allow(atom(), undefined | binary()) -> boolean().
+allow(Msg, UniqueKey) when
+    is_atom(Msg) andalso (is_binary(UniqueKey) orelse UniqueKey =:= undefined)
+->
     case emqx_logger:get_primary_log_level() of
         debug ->
             true;
         _ ->
-            do_allow(Msg)
+            do_allow(Msg, UniqueKey)
     end.
 
 -spec start_link() -> startlink_ret().
@@ -68,7 +74,8 @@ start_link() ->
 %%--------------------------------------------------------------------
 
 init([]) ->
-    ok = lists:foreach(fun(Msg) -> ?NEW_THROTTLE(Msg, ?NEW_SEQ) end, ?MSGS_LIST),
+    process_flag(trap_exit, true),
+    ok = lists:foreach(fun new_throttler/1, ?MSGS_LIST),
     CurrentPeriodMs = ?TIME_WINDOW_MS,
     TimerRef = schedule_refresh(CurrentPeriodMs),
     {ok, #{timer_ref => TimerRef, current_period_ms => CurrentPeriodMs}}.
@@ -86,16 +93,22 @@ handle_info(refresh, #{current_period_ms := PeriodMs} = State) ->
     DroppedStats = lists:foldl(
         fun(Msg, Acc) ->
             case ?GET_SEQ(Msg) of
-                %% Should not happen, unless the static ids list is updated at run-time.
                 undefined ->
-                    ?NEW_THROTTLE(Msg, ?NEW_SEQ),
+                    %% Should not happen, unless the static ids list is updated at run-time.
+                    new_throttler(Msg),
                     ?tp(log_throttler_new_msg, #{throttled_msg => Msg}),
                     Acc;
+                SeqMap when is_map(SeqMap) ->
+                    maps:fold(
+                        fun(Key, Ref, Acc0) ->
+                            ID = iolist_to_binary([atom_to_binary(Msg), $:, Key]),
+                            drop_stats(Ref, ID, Acc0)
+                        end,
+                        Acc,
+                        SeqMap
+                    );
                 SeqRef ->
-                    Dropped = ?GET_DROPPED(SeqRef),
-                    ok = ?RESET_SEQ(SeqRef),
-                    ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
-                    maybe_add_dropped(Msg, Dropped, Acc)
+                    drop_stats(SeqRef, Msg, Acc)
             end
         end,
         #{},
@@ -112,7 +125,16 @@ handle_info(Info, State) ->
     ?SLOG(error, #{msg => "unxpected_info", info => Info}),
     {noreply, State}.
 
+drop_stats(SeqRef, Msg, Acc) ->
+    Dropped = ?GET_DROPPED(SeqRef),
+    ok = ?RESET_SEQ(SeqRef),
+    ?tp(log_throttler_dropped, #{dropped_count => Dropped, throttled_msg => Msg}),
+    maybe_add_dropped(Msg, Dropped, Acc).
+
 terminate(_Reason, _State) ->
+    %% atomics do not have delete/remove/release/deallocate API
+    %% after the reference is garbage-collected the resource is released
+    lists:foreach(fun(Msg) -> ?ERASE_SEQ(Msg) end, ?MSGS_LIST),
     ok.
 
 code_change(_OldVsn, State, _Extra) ->
@@ -122,17 +144,27 @@ code_change(_OldVsn, State, _Extra) ->
 %% internal functions
 %%--------------------------------------------------------------------
 
-do_allow(Msg) ->
+do_allow(Msg, UniqueKey) ->
     case persistent_term:get(?SEQ_ID(Msg), undefined) of
         undefined ->
             %% This is either a race condition (emqx_log_throttler is not started yet)
             %% or a developer mistake (msg used in ?SLOG_THROTTLE/2,3 macro is
             %% not added to the default value of `log.throttling.msgs`.
-            ?SLOG(info, #{
-                msg => "missing_log_throttle_sequence",
+            ?SLOG(debug, #{
+                msg => "log_throttle_disabled",
                 throttled_msg => Msg
             }),
             true;
+        %% e.g: unrecoverable msg throttle according resource_id
+        SeqMap when is_map(SeqMap) ->
+            case maps:find(UniqueKey, SeqMap) of
+                {ok, SeqRef} ->
+                    ?IS_ALLOWED(SeqRef);
+                error ->
+                    SeqRef = ?NEW_SEQ,
+                    new_throttler(Msg, SeqMap#{UniqueKey => SeqRef}),
+                    true
+            end;
         SeqRef ->
             ?IS_ALLOWED(SeqRef)
     end.
@@ -154,3 +186,11 @@ maybe_log_dropped(_DroppedStats, _PeriodMs) ->
 schedule_refresh(PeriodMs) ->
     ?tp(log_throttler_sched_refresh, #{new_period_ms => PeriodMs}),
     erlang:send_after(PeriodMs, ?MODULE, refresh).
+
+new_throttler(unrecoverable_resource_error = Msg) ->
+    new_throttler(Msg, #{});
+new_throttler(Msg) ->
+    new_throttler(Msg, ?NEW_SEQ).
+
+new_throttler(Msg, AtomicOrEmptyMap) ->
+    persistent_term:put(?SEQ_ID(Msg), AtomicOrEmptyMap).

+ 65 - 13
apps/emqx/test/emqx_log_throttler_SUITE.erl

@@ -26,6 +26,7 @@
 %% Have to use real msgs, as the schema is guarded by enum.
 -define(THROTTLE_MSG, authorization_permission_denied).
 -define(THROTTLE_MSG1, cannot_publish_to_topic_due_to_not_authorized).
+-define(THROTTLE_UNRECOVERABLE_MSG, unrecoverable_resource_error).
 -define(TIME_WINDOW, <<"1s">>).
 
 all() -> emqx_common_test_helpers:all(?MODULE).
@@ -59,6 +60,11 @@ end_per_suite(Config) ->
     emqx_cth_suite:stop(?config(suite_apps, Config)),
     emqx_config:delete_override_conf_files().
 
+init_per_testcase(t_throttle_recoverable_msg, Config) ->
+    ok = snabbkaffe:start_trace(),
+    [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
+    {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_UNRECOVERABLE_MSG | Conf], #{}),
+    Config;
 init_per_testcase(t_throttle_add_new_msg, Config) ->
     ok = snabbkaffe:start_trace(),
     [?THROTTLE_MSG] = Conf = emqx:get_config([log, throttling, msgs]),
@@ -72,6 +78,10 @@ init_per_testcase(_TC, Config) ->
     ok = snabbkaffe:start_trace(),
     Config.
 
+end_per_testcase(t_throttle_recoverable_msg, _Config) ->
+    ok = snabbkaffe:stop(),
+    {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
+    ok;
 end_per_testcase(t_throttle_add_new_msg, _Config) ->
     ok = snabbkaffe:stop(),
     {ok, _} = emqx_conf:update([log, throttling, msgs], [?THROTTLE_MSG], #{}),
@@ -101,8 +111,8 @@ t_throttle(_Config) ->
                 5000
             ),
 
-            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
-            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
+            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
+            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
             {ok, _} = ?block_until(
                 #{
                     ?snk_kind := log_throttler_dropped,
@@ -115,14 +125,48 @@ t_throttle(_Config) ->
         []
     ).
 
+t_throttle_recoverable_msg(_Config) ->
+    ResourceId = <<"resource_id">>,
+    ThrottledMsg = iolist_to_binary([atom_to_list(?THROTTLE_UNRECOVERABLE_MSG), ":", ResourceId]),
+    ?check_trace(
+        begin
+            %% Warm-up and block to increase the probability that next events
+            %% will be in the same throttling time window.
+            {ok, _} = ?block_until(
+                #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_UNRECOVERABLE_MSG},
+                5000
+            ),
+            {_, {ok, _}} = ?wait_async_action(
+                events(?THROTTLE_UNRECOVERABLE_MSG, ResourceId),
+                #{
+                    ?snk_kind := log_throttler_dropped,
+                    throttled_msg := ThrottledMsg
+                },
+                5000
+            ),
+
+            ?assert(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
+            ?assertNot(emqx_log_throttler:allow(?THROTTLE_UNRECOVERABLE_MSG, ResourceId)),
+            {ok, _} = ?block_until(
+                #{
+                    ?snk_kind := log_throttler_dropped,
+                    throttled_msg := ThrottledMsg,
+                    dropped_count := 1
+                },
+                3000
+            )
+        end,
+        []
+    ).
+
 t_throttle_add_new_msg(_Config) ->
     ?check_trace(
         begin
             {ok, _} = ?block_until(
                 #{?snk_kind := log_throttler_new_msg, throttled_msg := ?THROTTLE_MSG1}, 5000
             ),
-            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1)),
-            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1)),
+            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
+            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG1, undefined)),
             {ok, _} = ?block_until(
                 #{
                     ?snk_kind := log_throttler_dropped,
@@ -137,10 +181,15 @@ t_throttle_add_new_msg(_Config) ->
 
 t_throttle_no_msg(_Config) ->
     %% Must simply pass with no crashes
-    ?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
-    ?assert(emqx_log_throttler:allow(no_test_throttle_msg)),
-    timer:sleep(10),
-    ?assert(erlang:is_process_alive(erlang:whereis(emqx_log_throttler))).
+    Pid = erlang:whereis(emqx_log_throttler),
+    ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
+    ?assert(emqx_log_throttler:allow(no_test_throttle_msg, undefined)),
+    %% assert process is not restarted
+    ?assertEqual(Pid, erlang:whereis(emqx_log_throttler)),
+    %% make a gen_call to ensure the process is alive
+    %% note: this call result in an 'unexpected_call' error log.
+    ?assertEqual(ignored, gen_server:call(Pid, probe)),
+    ok.
 
 t_update_time_window(_Config) ->
     ?check_trace(
@@ -168,8 +217,8 @@ t_throttle_debug_primary_level(_Config) ->
                 #{?snk_kind := log_throttler_dropped, throttled_msg := ?THROTTLE_MSG},
                 5000
             ),
-            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG)),
-            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG)),
+            ?assert(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
+            ?assertNot(emqx_log_throttler:allow(?THROTTLE_MSG, undefined)),
             {ok, _} = ?block_until(
                 #{
                     ?snk_kind := log_throttler_dropped,
@@ -187,10 +236,13 @@ t_throttle_debug_primary_level(_Config) ->
 %%--------------------------------------------------------------------
 
 events(Msg) ->
-    events(100, Msg).
+    events(100, Msg, undefined).
+
+events(Msg, Id) ->
+    events(100, Msg, Id).
 
-events(N, Msg) ->
-    [emqx_log_throttler:allow(Msg) || _ <- lists:seq(1, N)].
+events(N, Msg, Id) ->
+    [emqx_log_throttler:allow(Msg, Id) || _ <- lists:seq(1, N)].
 
 module_exists(Mod) ->
     case erlang:module_loaded(Mod) of

+ 2 - 0
apps/emqx_auth_http/include/emqx_auth_http.hrl

@@ -31,4 +31,6 @@
 -define(AUTHN_TYPE, {?AUTHN_MECHANISM, ?AUTHN_BACKEND}).
 -define(AUTHN_TYPE_SCRAM, {?AUTHN_MECHANISM_SCRAM, ?AUTHN_BACKEND}).
 
+-define(AUTHN_DATA_FIELDS, [is_superuser, client_attrs, expire_at, acl]).
+
 -endif.

+ 1 - 1
apps/emqx_auth_http/src/emqx_auth_http_app.erl

@@ -25,7 +25,7 @@
 start(_StartType, _StartArgs) ->
     ok = emqx_authz:register_source(?AUTHZ_TYPE, emqx_authz_http),
     ok = emqx_authn:register_provider(?AUTHN_TYPE, emqx_authn_http),
-    ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_http),
+    ok = emqx_authn:register_provider(?AUTHN_TYPE_SCRAM, emqx_authn_scram_restapi),
     {ok, Sup} = emqx_auth_http_sup:start_link(),
     {ok, Sup}.
 

+ 35 - 32
apps/emqx_auth_http/src/emqx_authn_http.erl

@@ -32,7 +32,9 @@
     with_validated_config/2,
     generate_request/2,
     request_for_log/2,
-    response_for_log/1
+    response_for_log/1,
+    extract_auth_data/2,
+    safely_parse_body/2
 ]).
 
 -define(DEFAULT_CONTENT_TYPE, <<"application/json">>).
@@ -194,34 +196,14 @@ handle_response(Headers, Body) ->
     case safely_parse_body(ContentType, Body) of
         {ok, NBody} ->
             body_to_auth_data(NBody);
-        {error, Reason} ->
-            ?TRACE_AUTHN_PROVIDER(
-                error,
-                "parse_http_response_failed",
-                #{content_type => ContentType, body => Body, reason => Reason}
-            ),
+        {error, _Reason} ->
             ignore
     end.
 
 body_to_auth_data(Body) ->
     case maps:get(<<"result">>, Body, <<"ignore">>) of
         <<"allow">> ->
-            IsSuperuser = emqx_authn_utils:is_superuser(Body),
-            Attrs = emqx_authn_utils:client_attrs(Body),
-            try
-                ExpireAt = expire_at(Body),
-                ACL = acl(ExpireAt, Body),
-                Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]),
-                {ok, Result}
-            catch
-                throw:{bad_acl_rule, Reason} ->
-                    %% it's a invalid token, so ok to log
-                    ?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{http_body => Body}),
-                    {error, bad_username_or_password};
-                throw:Reason ->
-                    ?TRACE_AUTHN_PROVIDER("bad_response_body", Reason#{http_body => Body}),
-                    {error, bad_username_or_password}
-            end;
+            extract_auth_data(http, Body);
         <<"deny">> ->
             {error, not_authorized};
         <<"ignore">> ->
@@ -230,6 +212,24 @@ body_to_auth_data(Body) ->
             ignore
     end.
 
+extract_auth_data(Source, Body) ->
+    IsSuperuser = emqx_authn_utils:is_superuser(Body),
+    Attrs = emqx_authn_utils:client_attrs(Body),
+    try
+        ExpireAt = expire_at(Body),
+        ACL = acl(ExpireAt, Source, Body),
+        Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]),
+        {ok, Result}
+    catch
+        throw:{bad_acl_rule, Reason} ->
+            %% it's a invalid token, so ok to log
+            ?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{http_body => Body}),
+            {error, bad_username_or_password};
+        throw:Reason ->
+            ?TRACE_AUTHN_PROVIDER("bad_response_body", Reason#{http_body => Body}),
+            {error, bad_username_or_password}
+    end.
+
 merge_maps([]) -> #{};
 merge_maps([Map | Maps]) -> maps:merge(Map, merge_maps(Maps)).
 
@@ -268,40 +268,43 @@ expire_sec(#{<<"expire_at">> := _}) ->
 expire_sec(_) ->
     undefined.
 
-acl(#{expire_at := ExpireTimeMs}, #{<<"acl">> := Rules}) ->
+acl(#{expire_at := ExpireTimeMs}, Source, #{<<"acl">> := Rules}) ->
     #{
         acl => #{
-            source_for_logging => http,
+            source_for_logging => Source,
             rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules),
             %% It's seconds level precision (like JWT) for authz
             %% see emqx_authz_client_info:check/1
             expire => erlang:convert_time_unit(ExpireTimeMs, millisecond, second)
         }
     };
-acl(_NoExpire, #{<<"acl">> := Rules}) ->
+acl(_NoExpire, Source, #{<<"acl">> := Rules}) ->
     #{
         acl => #{
-            source_for_logging => http,
+            source_for_logging => Source,
             rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules)
         }
     };
-acl(_, _) ->
+acl(_, _, _) ->
     #{}.
 
 safely_parse_body(ContentType, Body) ->
     try
         parse_body(ContentType, Body)
     catch
-        _Class:_Reason ->
+        _Class:Reason ->
+            ?TRACE_AUTHN_PROVIDER(
+                error,
+                "parse_http_response_failed",
+                #{content_type => ContentType, body => Body, reason => Reason}
+            ),
             {error, invalid_body}
     end.
 
 parse_body(<<"application/json", _/binary>>, Body) ->
     {ok, emqx_utils_json:decode(Body, [return_maps])};
 parse_body(<<"application/x-www-form-urlencoded", _/binary>>, Body) ->
-    Flags = [<<"result">>, <<"is_superuser">>],
-    RawMap = maps:from_list(cow_qs:parse_qs(Body)),
-    NBody = maps:with(Flags, RawMap),
+    NBody = maps:from_list(cow_qs:parse_qs(Body)),
     {ok, NBody};
 parse_body(ContentType, _) ->
     {error, {unsupported_content_type, ContentType}}.

+ 23 - 41
apps/emqx_auth_http/src/emqx_authn_scram_http.erl

@@ -2,10 +2,19 @@
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_authn_scram_http).
+%% Note:
+%% This is not an implementation of the RFC 7804:
+%%   Salted Challenge Response HTTP Authentication Mechanism.
+%% This backend is an implementation of scram,
+%% which uses an external web resource as a source of user information.
 
--include_lib("emqx_auth/include/emqx_authn.hrl").
+-module(emqx_authn_scram_restapi).
+
+-feature(maybe_expr, enable).
+
+-include("emqx_auth_http.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_auth/include/emqx_authn.hrl").
 
 -behaviour(emqx_authn_provider).
 
@@ -22,10 +31,6 @@
     <<"salt">>
 ]).
 
--define(OPTIONAL_USER_INFO_KEYS, [
-    <<"is_superuser">>
-]).
-
 %%------------------------------------------------------------------------------
 %% APIs
 %%------------------------------------------------------------------------------
@@ -72,7 +77,9 @@ authenticate(
             reason => Reason
         })
     end,
-    emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State);
+    emqx_utils_scram:authenticate(
+        AuthMethod, AuthData, AuthCache, State, RetrieveFun, OnErrFun, ?AUTHN_DATA_FIELDS
+    );
 authenticate(_Credential, _State) ->
     ignore.
 
@@ -95,7 +102,7 @@ retrieve(
 ) ->
     Request = emqx_authn_http:generate_request(Credential#{username := Username}, State),
     Response = emqx_resource:simple_sync_query(ResourceId, {Method, Request, RequestTimeout}),
-    ?TRACE_AUTHN_PROVIDER("scram_http_response", #{
+    ?TRACE_AUTHN_PROVIDER("scram_restapi_response", #{
         request => emqx_authn_http:request_for_log(Credential, State),
         response => emqx_authn_http:response_for_log(Response),
         resource => ResourceId
@@ -113,16 +120,11 @@ retrieve(
 
 handle_response(Headers, Body) ->
     ContentType = proplists:get_value(<<"content-type">>, Headers),
-    case safely_parse_body(ContentType, Body) of
-        {ok, NBody} ->
-            body_to_user_info(NBody);
-        {error, Reason} = Error ->
-            ?TRACE_AUTHN_PROVIDER(
-                error,
-                "parse_scram_http_response_failed",
-                #{content_type => ContentType, body => Body, reason => Reason}
-            ),
-            Error
+    maybe
+        {ok, NBody} ?= emqx_authn_http:safely_parse_body(ContentType, Body),
+        {ok, UserInfo} ?= body_to_user_info(NBody),
+        {ok, AuthData} ?= emqx_authn_http:extract_auth_data(scram_restapi, NBody),
+        {ok, maps:merge(AuthData, UserInfo)}
     end.
 
 body_to_user_info(Body) ->
@@ -131,26 +133,16 @@ body_to_user_info(Body) ->
         true ->
             case safely_convert_hex(Required0) of
                 {ok, Required} ->
-                    UserInfo0 = maps:merge(Required, maps:with(?OPTIONAL_USER_INFO_KEYS, Body)),
-                    UserInfo1 = emqx_utils_maps:safe_atom_key_map(UserInfo0),
-                    UserInfo = maps:merge(#{is_superuser => false}, UserInfo1),
-                    {ok, UserInfo};
+                    {ok, emqx_utils_maps:safe_atom_key_map(Required)};
                 Error ->
+                    ?TRACE_AUTHN_PROVIDER("decode_keys_failed", #{http_body => Body}),
                     Error
             end;
         _ ->
-            ?TRACE_AUTHN_PROVIDER("bad_response_body", #{http_body => Body}),
+            ?TRACE_AUTHN_PROVIDER("missing_requried_keys", #{http_body => Body}),
             {error, bad_response}
     end.
 
-safely_parse_body(ContentType, Body) ->
-    try
-        parse_body(ContentType, Body)
-    catch
-        _Class:_Reason ->
-            {error, invalid_body}
-    end.
-
 safely_convert_hex(Required) ->
     try
         {ok,
@@ -165,15 +157,5 @@ safely_convert_hex(Required) ->
             {error, Reason}
     end.
 
-parse_body(<<"application/json", _/binary>>, Body) ->
-    {ok, emqx_utils_json:decode(Body, [return_maps])};
-parse_body(<<"application/x-www-form-urlencoded", _/binary>>, Body) ->
-    Flags = ?REQUIRED_USER_INFO_KEYS ++ ?OPTIONAL_USER_INFO_KEYS,
-    RawMap = maps:from_list(cow_qs:parse_qs(Body)),
-    NBody = maps:with(Flags, RawMap),
-    {ok, NBody};
-parse_body(ContentType, _) ->
-    {error, {unsupported_content_type, ContentType}}.
-
 merge_scram_conf(Conf, State) ->
     maps:merge(maps:with([algorithm, iteration_count], Conf), State).

+ 8 - 8
apps/emqx_auth_http/src/emqx_authn_scram_http_schema.erl

@@ -2,7 +2,7 @@
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_authn_scram_http_schema).
+-module(emqx_authn_scram_restapi_schema).
 
 -behaviour(emqx_authn_schema).
 
@@ -22,16 +22,16 @@
 namespace() -> "authn".
 
 refs() ->
-    [?R_REF(scram_http_get), ?R_REF(scram_http_post)].
+    [?R_REF(scram_restapi_get), ?R_REF(scram_restapi_post)].
 
 select_union_member(
     #{<<"mechanism">> := ?AUTHN_MECHANISM_SCRAM_BIN, <<"backend">> := ?AUTHN_BACKEND_BIN} = Value
 ) ->
     case maps:get(<<"method">>, Value, undefined) of
         <<"get">> ->
-            [?R_REF(scram_http_get)];
+            [?R_REF(scram_restapi_get)];
         <<"post">> ->
-            [?R_REF(scramm_http_post)];
+            [?R_REF(scram_restapi_post)];
         Else ->
             throw(#{
                 reason => "unknown_http_method",
@@ -43,20 +43,20 @@ select_union_member(
 select_union_member(_Value) ->
     undefined.
 
-fields(scram_http_get) ->
+fields(scram_restapi_get) ->
     [
         {method, #{type => get, required => true, desc => ?DESC(emqx_authn_http_schema, method)}},
         {headers, fun emqx_authn_http_schema:headers_no_content_type/1}
     ] ++ common_fields();
-fields(scram_http_post) ->
+fields(scram_restapi_post) ->
     [
         {method, #{type => post, required => true, desc => ?DESC(emqx_authn_http_schema, method)}},
         {headers, fun emqx_authn_http_schema:headers/1}
     ] ++ common_fields().
 
-desc(scram_http_get) ->
+desc(scram_restapi_get) ->
     ?DESC(emqx_authn_http_schema, get);
-desc(scram_http_post) ->
+desc(scram_restapi_post) ->
     ?DESC(emqx_authn_http_schema, post);
 desc(_) ->
     undefined.

+ 139 - 68
apps/emqx_auth_http/test/emqx_authn_scram_http_SUITE.erl

@@ -2,7 +2,7 @@
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_authn_scram_http_SUITE).
+-module(emqx_authn_scram_restapi_SUITE).
 
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -21,6 +21,9 @@
 -define(ALGORITHM_STR, <<"sha512">>).
 -define(ITERATION_COUNT, 4096).
 
+-define(T_ACL_USERNAME, <<"username">>).
+-define(T_ACL_PASSWORD, <<"password">>).
+
 -include_lib("emqx/include/emqx_placeholder.hrl").
 
 all() ->
@@ -54,11 +57,11 @@ init_per_testcase(_Case, Config) ->
         [authentication],
         ?GLOBAL
     ),
-    {ok, _} = emqx_authn_scram_http_test_server:start_link(?HTTP_PORT, ?HTTP_PATH),
+    {ok, _} = emqx_authn_scram_restapi_test_server:start_link(?HTTP_PORT, ?HTTP_PATH),
     Config.
 
 end_per_testcase(_Case, _Config) ->
-    ok = emqx_authn_scram_http_test_server:stop().
+    ok = emqx_authn_scram_restapi_test_server:stop().
 
 %%------------------------------------------------------------------------------
 %% Tests
@@ -72,7 +75,9 @@ t_create(_Config) ->
         {create_authenticator, ?GLOBAL, AuthConfig}
     ),
 
-    {ok, [#{provider := emqx_authn_scram_http}]} = emqx_authn_chains:list_authenticators(?GLOBAL).
+    {ok, [#{provider := emqx_authn_scram_restapi}]} = emqx_authn_chains:list_authenticators(
+        ?GLOBAL
+    ).
 
 t_create_invalid(_Config) ->
     AuthConfig = raw_config(),
@@ -118,59 +123,8 @@ t_authenticate(_Config) ->
 
     ok = emqx_config:put([mqtt, idle_timeout], 500),
 
-    {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
-
-    ClientFirstMessage = esasl_scram:client_first_message(Username),
-
-    ConnectPacket = ?CONNECT_PACKET(
-        #mqtt_packet_connect{
-            proto_ver = ?MQTT_PROTO_V5,
-            properties = #{
-                'Authentication-Method' => <<"SCRAM-SHA-512">>,
-                'Authentication-Data' => ClientFirstMessage
-            }
-        }
-    ),
-
-    ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket),
-
-    %% Intentional sleep to trigger idle timeout for the connection not yet authenticated
-    ok = ct:sleep(1000),
-
-    ?AUTH_PACKET(
-        ?RC_CONTINUE_AUTHENTICATION,
-        #{'Authentication-Data' := ServerFirstMessage}
-    ) = receive_packet(),
-
-    {continue, ClientFinalMessage, ClientCache} =
-        esasl_scram:check_server_first_message(
-            ServerFirstMessage,
-            #{
-                client_first_message => ClientFirstMessage,
-                password => Password,
-                algorithm => ?ALGORITHM
-            }
-        ),
-
-    AuthContinuePacket = ?AUTH_PACKET(
-        ?RC_CONTINUE_AUTHENTICATION,
-        #{
-            'Authentication-Method' => <<"SCRAM-SHA-512">>,
-            'Authentication-Data' => ClientFinalMessage
-        }
-    ),
-
-    ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket),
-
-    ?CONNACK_PACKET(
-        ?RC_SUCCESS,
-        _,
-        #{'Authentication-Data' := ServerFinalMessage}
-    ) = receive_packet(),
-
-    ok = esasl_scram:check_server_final_message(
-        ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM}
-    ).
+    {ok, Pid} = create_connection(Username, Password),
+    emqx_authn_mqtt_test_client:stop(Pid).
 
 t_authenticate_bad_props(_Config) ->
     Username = <<"u">>,
@@ -314,6 +268,47 @@ t_destroy(_Config) ->
         _
     ) = receive_packet().
 
+t_acl(_Config) ->
+    init_auth(),
+
+    ACL = emqx_authn_http_SUITE:acl_rules(),
+    set_user_handler(?T_ACL_USERNAME, ?T_ACL_PASSWORD, #{acl => ACL}),
+    {ok, Pid} = create_connection(?T_ACL_USERNAME, ?T_ACL_PASSWORD),
+
+    Cases = [
+        {allow, <<"http-authn-acl/#">>},
+        {deny, <<"http-authn-acl/1">>},
+        {deny, <<"t/#">>}
+    ],
+
+    try
+        lists:foreach(
+            fun(Case) ->
+                test_acl(Case, Pid)
+            end,
+            Cases
+        )
+    after
+        ok = emqx_authn_mqtt_test_client:stop(Pid)
+    end.
+
+t_auth_expire(_Config) ->
+    init_auth(),
+
+    ExpireSec = 3,
+    WaitTime = timer:seconds(ExpireSec + 1),
+    ACL = emqx_authn_http_SUITE:acl_rules(),
+
+    set_user_handler(?T_ACL_USERNAME, ?T_ACL_PASSWORD, #{
+        acl => ACL,
+        expire_at =>
+            erlang:system_time(second) + ExpireSec
+    }),
+    {ok, Pid} = create_connection(?T_ACL_USERNAME, ?T_ACL_PASSWORD),
+
+    timer:sleep(WaitTime),
+    ?assertEqual(false, erlang:is_process_alive(Pid)).
+
 t_is_superuser() ->
     State = init_auth(),
     ok = test_is_superuser(State, false),
@@ -324,12 +319,12 @@ test_is_superuser(State, ExpectedIsSuperuser) ->
     Username = <<"u">>,
     Password = <<"p">>,
 
-    set_user_handler(Username, Password, ExpectedIsSuperuser),
+    set_user_handler(Username, Password, #{is_superuser => ExpectedIsSuperuser}),
 
     ClientFirstMessage = esasl_scram:client_first_message(Username),
 
     {continue, ServerFirstMessage, ServerCache} =
-        emqx_authn_scram_http:authenticate(
+        emqx_authn_scram_restapi:authenticate(
             #{
                 auth_method => <<"SCRAM-SHA-512">>,
                 auth_data => ClientFirstMessage,
@@ -349,7 +344,7 @@ test_is_superuser(State, ExpectedIsSuperuser) ->
         ),
 
     {ok, UserInfo1, ServerFinalMessage} =
-        emqx_authn_scram_http:authenticate(
+        emqx_authn_scram_restapi:authenticate(
             #{
                 auth_method => <<"SCRAM-SHA-512">>,
                 auth_data => ClientFinalMessage,
@@ -382,24 +377,25 @@ raw_config() ->
     }.
 
 set_user_handler(Username, Password) ->
-    set_user_handler(Username, Password, false).
-set_user_handler(Username, Password, IsSuperuser) ->
+    set_user_handler(Username, Password, #{is_superuser => false}).
+set_user_handler(Username, Password, Extra0) ->
     %% HTTP Server
     Handler = fun(Req0, State) ->
         #{
             username := Username
         } = cowboy_req:match_qs([username], Req0),
 
-        UserInfo = make_user_info(Password, ?ALGORITHM, ?ITERATION_COUNT, IsSuperuser),
+        UserInfo = make_user_info(Password, ?ALGORITHM, ?ITERATION_COUNT),
+        Extra = maps:merge(#{is_superuser => false}, Extra0),
         Req = cowboy_req:reply(
             200,
             #{<<"content-type">> => <<"application/json">>},
-            emqx_utils_json:encode(UserInfo),
+            emqx_utils_json:encode(maps:merge(Extra, UserInfo)),
             Req0
         ),
         {ok, Req, State}
     end,
-    ok = emqx_authn_scram_http_test_server:set_handler(Handler).
+    ok = emqx_authn_scram_restapi_test_server:set_handler(Handler).
 
 init_auth() ->
     init_auth(raw_config()).
@@ -413,7 +409,7 @@ init_auth(Config) ->
     {ok, [#{state := State}]} = emqx_authn_chains:list_authenticators(?GLOBAL),
     State.
 
-make_user_info(Password, Algorithm, IterationCount, IsSuperuser) ->
+make_user_info(Password, Algorithm, IterationCount) ->
     {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info(
         Password,
         #{
@@ -424,8 +420,7 @@ make_user_info(Password, Algorithm, IterationCount, IsSuperuser) ->
     #{
         stored_key => binary:encode_hex(StoredKey),
         server_key => binary:encode_hex(ServerKey),
-        salt => binary:encode_hex(Salt),
-        is_superuser => IsSuperuser
+        salt => binary:encode_hex(Salt)
     }.
 
 receive_packet() ->
@@ -436,3 +431,79 @@ receive_packet() ->
     after 1000 ->
         ct:fail("Deliver timeout")
     end.
+
+create_connection(Username, Password) ->
+    {ok, Pid} = emqx_authn_mqtt_test_client:start_link("127.0.0.1", 1883),
+
+    ClientFirstMessage = esasl_scram:client_first_message(Username),
+
+    ConnectPacket = ?CONNECT_PACKET(
+        #mqtt_packet_connect{
+            proto_ver = ?MQTT_PROTO_V5,
+            properties = #{
+                'Authentication-Method' => <<"SCRAM-SHA-512">>,
+                'Authentication-Data' => ClientFirstMessage
+            }
+        }
+    ),
+
+    ok = emqx_authn_mqtt_test_client:send(Pid, ConnectPacket),
+
+    %% Intentional sleep to trigger idle timeout for the connection not yet authenticated
+    ok = ct:sleep(1000),
+
+    ?AUTH_PACKET(
+        ?RC_CONTINUE_AUTHENTICATION,
+        #{'Authentication-Data' := ServerFirstMessage}
+    ) = receive_packet(),
+
+    {continue, ClientFinalMessage, ClientCache} =
+        esasl_scram:check_server_first_message(
+            ServerFirstMessage,
+            #{
+                client_first_message => ClientFirstMessage,
+                password => Password,
+                algorithm => ?ALGORITHM
+            }
+        ),
+
+    AuthContinuePacket = ?AUTH_PACKET(
+        ?RC_CONTINUE_AUTHENTICATION,
+        #{
+            'Authentication-Method' => <<"SCRAM-SHA-512">>,
+            'Authentication-Data' => ClientFinalMessage
+        }
+    ),
+
+    ok = emqx_authn_mqtt_test_client:send(Pid, AuthContinuePacket),
+
+    ?CONNACK_PACKET(
+        ?RC_SUCCESS,
+        _,
+        #{'Authentication-Data' := ServerFinalMessage}
+    ) = receive_packet(),
+
+    ok = esasl_scram:check_server_final_message(
+        ServerFinalMessage, ClientCache#{algorithm => ?ALGORITHM}
+    ),
+    {ok, Pid}.
+
+test_acl({allow, Topic}, C) ->
+    ?assertMatch(
+        [0],
+        send_subscribe(C, Topic)
+    );
+test_acl({deny, Topic}, C) ->
+    ?assertMatch(
+        [?RC_NOT_AUTHORIZED],
+        send_subscribe(C, Topic)
+    ).
+
+send_subscribe(Client, Topic) ->
+    TopicOpts = #{nl => 0, rap => 0, rh => 0, qos => 0},
+    Packet = ?SUBSCRIBE_PACKET(1, [{Topic, TopicOpts}]),
+    emqx_authn_mqtt_test_client:send(Client, Packet),
+    timer:sleep(200),
+
+    ?SUBACK_PACKET(1, ReasonCode) = receive_packet(),
+    ReasonCode.

+ 1 - 1
apps/emqx_auth_http/test/emqx_authn_scram_http_test_server.erl

@@ -2,7 +2,7 @@
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_authn_scram_http_test_server).
+-module(emqx_authn_scram_restapi_test_server).
 
 -behaviour(supervisor).
 -behaviour(cowboy_handler).

+ 3 - 1
apps/emqx_auth_mnesia/src/emqx_authn_scram_mnesia.erl

@@ -141,7 +141,9 @@ authenticate(
             reason => Reason
         })
     end,
-    emqx_utils_scram:authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, State);
+    emqx_utils_scram:authenticate(
+        AuthMethod, AuthData, AuthCache, State, RetrieveFun, OnErrFun, [is_superuser]
+    );
 authenticate(_Credential, _State) ->
     ignore.
 

+ 1 - 1
apps/emqx_bridge_azure_event_hub/mix.exs

@@ -23,7 +23,7 @@ defmodule EMQXBridgeAzureEventHub.MixProject do
 
   def deps() do
     [
-      {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
+      {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
       {:brod, github: "kafka4beam/brod", tag: "3.18.0"},

+ 1 - 1
apps/emqx_bridge_azure_event_hub/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 27 - 8
apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl

@@ -382,12 +382,31 @@ t_multiple_actions_sharing_topic(Config) ->
             ActionConfig0,
             #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
         ),
-    ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
-        [
-            {type, ?BRIDGE_TYPE_BIN},
-            {connector_name, ?config(connector_name, Config)},
-            {connector_config, ?config(connector_config, Config)},
-            {action_config, ActionConfig}
-        ]
-    ),
+    ok =
+        emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
+            [
+                {type, ?BRIDGE_TYPE_BIN},
+                {connector_name, ?config(connector_name, Config)},
+                {connector_config, ?config(connector_config, Config)},
+                {action_config, ActionConfig}
+            ]
+        ),
+    ok.
+
+t_dynamic_topics(Config) ->
+    ActionConfig0 = ?config(action_config, Config),
+    ActionConfig =
+        emqx_utils_maps:deep_merge(
+            ActionConfig0,
+            #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
+        ),
+    ok =
+        emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
+            [
+                {type, ?BRIDGE_TYPE_BIN},
+                {connector_name, ?config(connector_name, Config)},
+                {connector_config, ?config(connector_config, Config)},
+                {action_config, ActionConfig}
+            ]
+        ),
     ok.

+ 1 - 1
apps/emqx_bridge_confluent/mix.exs

@@ -23,7 +23,7 @@ defmodule EMQXBridgeConfluent.MixProject do
 
   def deps() do
     [
-      {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
+      {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
       {:brod, github: "kafka4beam/brod", tag: "3.18.0"},

+ 1 - 1
apps/emqx_bridge_confluent/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 27 - 8
apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl

@@ -391,12 +391,31 @@ t_multiple_actions_sharing_topic(Config) ->
             ActionConfig0,
             #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
         ),
-    ok = emqx_bridge_v2_kafka_producer_SUITE:t_multiple_actions_sharing_topic(
-        [
-            {type, ?ACTION_TYPE_BIN},
-            {connector_name, ?config(connector_name, Config)},
-            {connector_config, ?config(connector_config, Config)},
-            {action_config, ActionConfig}
-        ]
-    ),
+    ok =
+        emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
+            [
+                {type, ?ACTION_TYPE_BIN},
+                {connector_name, ?config(connector_name, Config)},
+                {connector_config, ?config(connector_config, Config)},
+                {action_config, ActionConfig}
+            ]
+        ),
+    ok.
+
+t_dynamic_topics(Config) ->
+    ActionConfig0 = ?config(action_config, Config),
+    ActionConfig =
+        emqx_utils_maps:deep_merge(
+            ActionConfig0,
+            #{<<"parameters">> => #{<<"query_mode">> => <<"sync">>}}
+        ),
+    ok =
+        emqx_bridge_v2_kafka_producer_SUITE:?FUNCTION_NAME(
+            [
+                {type, ?ACTION_TYPE_BIN},
+                {connector_name, ?config(connector_name, Config)},
+                {connector_config, ?config(connector_config, Config)},
+                {action_config, ActionConfig}
+            ]
+        ),
     ok.

+ 1 - 1
apps/emqx_bridge_kafka/mix.exs

@@ -23,7 +23,7 @@ defmodule EMQXBridgeKafka.MixProject do
 
   def deps() do
     [
-      {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
+      {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
       {:brod, github: "kafka4beam/brod", tag: "3.18.0"},

+ 1 - 1
apps/emqx_bridge_kafka/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "2.0.0"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "3.0.2"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},

+ 30 - 5
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl

@@ -295,6 +295,7 @@ fields("config_producer") ->
 fields("config_consumer") ->
     fields(kafka_consumer);
 fields(kafka_producer) ->
+    %% Schema used by bridges V1.
     connector_config_fields() ++ producer_opts(v1);
 fields(kafka_producer_action) ->
     emqx_bridge_v2_schema:common_fields() ++ producer_opts(action);
@@ -356,9 +357,33 @@ fields(socket_opts) ->
                 validator => fun emqx_schema:validate_tcp_keepalive/1
             })}
     ];
+fields(v1_producer_kafka_opts) ->
+    OldSchemaFields =
+        [
+            topic,
+            message,
+            max_batch_bytes,
+            compression,
+            partition_strategy,
+            required_acks,
+            kafka_headers,
+            kafka_ext_headers,
+            kafka_header_value_encode_mode,
+            partition_count_refresh_interval,
+            partitions_limit,
+            max_inflight,
+            buffer,
+            query_mode,
+            sync_query_timeout
+        ],
+    Fields = fields(producer_kafka_opts),
+    lists:filter(
+        fun({K, _V}) -> lists:member(K, OldSchemaFields) end,
+        Fields
+    );
 fields(producer_kafka_opts) ->
     [
-        {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
+        {topic, mk(emqx_schema:template(), #{required => true, desc => ?DESC(kafka_topic)})},
         {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})},
         {max_batch_bytes,
             mk(emqx_schema:bytesize(), #{default => <<"896KB">>, desc => ?DESC(max_batch_bytes)})},
@@ -672,15 +697,15 @@ resource_opts() ->
 %% However we need to keep it backward compatible for generated schema json (version 0.1.0)
 %% since schema is data for the 'schemas' API.
 parameters_field(ActionOrBridgeV1) ->
-    {Name, Alias} =
+    {Name, Alias, Ref} =
         case ActionOrBridgeV1 of
             v1 ->
-                {kafka, parameters};
+                {kafka, parameters, v1_producer_kafka_opts};
             action ->
-                {parameters, kafka}
+                {parameters, kafka, producer_kafka_opts}
         end,
     {Name,
-        mk(ref(producer_kafka_opts), #{
+        mk(ref(Ref), #{
             required => true,
             aliases => [Alias],
             desc => ?DESC(producer_kafka_opts),

+ 114 - 51
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -3,6 +3,8 @@
 %%--------------------------------------------------------------------
 -module(emqx_bridge_kafka_impl_producer).
 
+-feature(maybe_expr, enable).
+
 -behaviour(emqx_resource).
 
 -include_lib("emqx_resource/include/emqx_resource.hrl").
@@ -125,8 +127,8 @@ on_add_channel(
     {ok, NewState}.
 
 create_producers_for_bridge_v2(
-    InstId,
-    BridgeV2Id,
+    ConnResId,
+    ActionResId,
     ClientId,
     #{
         bridge_type := BridgeType,
@@ -135,33 +137,42 @@ create_producers_for_bridge_v2(
 ) ->
     #{
         message := MessageTemplate,
-        topic := KafkaTopic,
+        topic := KafkaTopic0,
         sync_query_timeout := SyncQueryTimeout
     } = KafkaConfig,
+    TopicTemplate = {TopicType, TopicOrTemplate} = maybe_preproc_topic(KafkaTopic0),
+    MKafkaTopic =
+        case TopicType of
+            fixed -> TopicOrTemplate;
+            dynamic -> dynamic
+        end,
     KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
     KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
     KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
     MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions),
-    #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id),
-    IsDryRun = emqx_resource:is_dry_run(BridgeV2Id),
-    ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
+    #{name := BridgeName} = emqx_bridge_v2:parse_id(ActionResId),
+    IsDryRun = emqx_resource:is_dry_run(ActionResId),
+    ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
     WolffProducerConfig = producers_config(
-        BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id
+        BridgeType, BridgeName, KafkaConfig, IsDryRun, ActionResId
     ),
-    case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
+    case wolff:ensure_supervised_dynamic_producers(ClientId, WolffProducerConfig) of
         {ok, Producers} ->
-            ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers),
             ok = emqx_resource:allocate_resource(
-                InstId, {?kafka_telemetry_id, BridgeV2Id}, BridgeV2Id
+                ConnResId, {?kafka_producers, ActionResId}, Producers
+            ),
+            ok = emqx_resource:allocate_resource(
+                ConnResId, {?kafka_telemetry_id, ActionResId}, ActionResId
             ),
-            _ = maybe_install_wolff_telemetry_handlers(BridgeV2Id),
+            _ = maybe_install_wolff_telemetry_handlers(ActionResId),
             {ok, #{
                 message_template => compile_message_template(MessageTemplate),
                 kafka_client_id => ClientId,
-                kafka_topic => KafkaTopic,
+                topic_template => TopicTemplate,
+                topic => MKafkaTopic,
                 producers => Producers,
-                resource_id => BridgeV2Id,
-                connector_resource_id => InstId,
+                resource_id => ActionResId,
+                connector_resource_id => ConnResId,
                 sync_query_timeout => SyncQueryTimeout,
                 kafka_config => KafkaConfig,
                 headers_tokens => KafkaHeadersTokens,
@@ -172,9 +183,9 @@ create_producers_for_bridge_v2(
         {error, Reason2} ->
             ?SLOG(error, #{
                 msg => "failed_to_start_kafka_producer",
-                instance_id => InstId,
+                instance_id => ConnResId,
                 kafka_client_id => ClientId,
-                kafka_topic => KafkaTopic,
+                kafka_topic => MKafkaTopic,
                 reason => Reason2
             }),
             throw(
@@ -267,7 +278,9 @@ remove_producers_for_bridge_v2(
     ClientId = maps:get(?kafka_client_id, AllocatedResources, no_client_id),
     maps:foreach(
         fun
-            ({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id ->
+            ({?kafka_producers, BridgeV2IdCheck}, Producers) when
+                BridgeV2IdCheck =:= BridgeV2Id
+            ->
                 deallocate_producers(ClientId, Producers);
             ({?kafka_telemetry_id, BridgeV2IdCheck}, TelemetryId) when
                 BridgeV2IdCheck =:= BridgeV2Id
@@ -300,7 +313,8 @@ on_query(
     #{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState
 ) ->
     #{
-        message_template := Template,
+        message_template := MessageTemplate,
+        topic_template := TopicTemplate,
         producers := Producers,
         sync_query_timeout := SyncTimeout,
         headers_tokens := KafkaHeadersTokens,
@@ -313,7 +327,8 @@ on_query(
         headers_val_encode_mode => KafkaHeadersValEncodeMode
     },
     try
-        KafkaMessage = render_message(Template, KafkaHeaders, Message),
+        KafkaTopic = render_topic(TopicTemplate, Message),
+        KafkaMessage = render_message(MessageTemplate, KafkaHeaders, Message),
         ?tp(
             emqx_bridge_kafka_impl_producer_sync_query,
             #{headers_config => KafkaHeaders, instance_id => InstId}
@@ -321,9 +336,15 @@ on_query(
         emqx_trace:rendered_action_template(MessageTag, #{
             message => KafkaMessage
         }),
-        do_send_msg(sync, KafkaMessage, Producers, SyncTimeout)
+        do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout)
     catch
-        error:{invalid_partition_count, Count, _Partitioner} ->
+        throw:bad_topic ->
+            ?tp("kafka_producer_failed_to_render_topic", #{}),
+            {error, {unrecoverable_error, failed_to_render_topic}};
+        throw:#{cause := unknown_topic_or_partition, topic := Topic} ->
+            ?tp("kafka_producer_resolved_to_unknown_topic", #{}),
+            {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}};
+        throw:#{cause := invalid_partition_count, count := Count} ->
             ?tp("kafka_producer_invalid_partition_count", #{
                 action_id => MessageTag,
                 query_mode => sync
@@ -368,6 +389,7 @@ on_query_async(
 ) ->
     #{
         message_template := Template,
+        topic_template := TopicTemplate,
         producers := Producers,
         headers_tokens := KafkaHeadersTokens,
         ext_headers_tokens := KafkaExtHeadersTokens,
@@ -379,6 +401,7 @@ on_query_async(
         headers_val_encode_mode => KafkaHeadersValEncodeMode
     },
     try
+        KafkaTopic = render_topic(TopicTemplate, Message),
         KafkaMessage = render_message(Template, KafkaHeaders, Message),
         ?tp(
             emqx_bridge_kafka_impl_producer_async_query,
@@ -387,9 +410,15 @@ on_query_async(
         emqx_trace:rendered_action_template(MessageTag, #{
             message => KafkaMessage
         }),
-        do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn)
+        do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn)
     catch
-        error:{invalid_partition_count, Count, _Partitioner} ->
+        throw:bad_topic ->
+            ?tp("kafka_producer_failed_to_render_topic", #{}),
+            {error, {unrecoverable_error, failed_to_render_topic}};
+        throw:#{cause := unknown_topic_or_partition, topic := Topic} ->
+            ?tp("kafka_producer_resolved_to_unknown_topic", #{}),
+            {error, {unrecoverable_error, {resolved_to_unknown_topic, Topic}}};
+        throw:#{cause := invalid_partition_count, count := Count} ->
             ?tp("kafka_producer_invalid_partition_count", #{
                 action_id => MessageTag,
                 query_mode => async
@@ -427,9 +456,28 @@ compile_message_template(T) ->
         timestamp => preproc_tmpl(TimestampTemplate)
     }.
 
+maybe_preproc_topic(Topic) ->
+    Template = emqx_template:parse(Topic),
+    case emqx_template:placeholders(Template) of
+        [] ->
+            {fixed, bin(Topic)};
+        [_ | _] ->
+            {dynamic, Template}
+    end.
+
 preproc_tmpl(Tmpl) ->
     emqx_placeholder:preproc_tmpl(Tmpl).
 
+render_topic({fixed, KafkaTopic}, _Message) ->
+    KafkaTopic;
+render_topic({dynamic, Template}, Message) ->
+    try
+        iolist_to_binary(emqx_template:render_strict(Template, {emqx_jsonish, Message}))
+    catch
+        error:_Errors ->
+            throw(bad_topic)
+    end.
+
 render_message(
     #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate},
     #{
@@ -471,9 +519,11 @@ render_timestamp(Template, Message) ->
             erlang:system_time(millisecond)
     end.
 
-do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) ->
+do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) ->
     try
-        {_Partition, _Offset} = wolff:send_sync(Producers, [KafkaMessage], SyncTimeout),
+        {_Partition, _Offset} = wolff:send_sync2(
+            Producers, KafkaTopic, [KafkaMessage], SyncTimeout
+        ),
         ok
     catch
         error:{producer_down, _} = Reason ->
@@ -481,7 +531,7 @@ do_send_msg(sync, KafkaMessage, Producers, SyncTimeout) ->
         error:timeout ->
             {error, timeout}
     end;
-do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
+do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) ->
     %% * Must be a batch because wolff:send and wolff:send_sync are batch APIs
     %% * Must be a single element batch because wolff books calls, but not batch sizes
     %%   for counters and gauges.
@@ -489,7 +539,9 @@ do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn) ->
     %% The retuned information is discarded here.
     %% If the producer process is down when sending, this function would
     %% raise an error exception which is to be caught by the caller of this callback
-    {_Partition, Pid} = wolff:send(Producers, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}),
+    {_Partition, Pid} = wolff:send2(
+        Producers, KafkaTopic, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}
+    ),
     %% this Pid is so far never used because Kafka producer is by-passing the buffer worker
     {ok, Pid}.
 
@@ -530,20 +582,23 @@ on_get_status(
     end.
 
 on_get_channel_status(
-    _ResId,
-    ChannelId,
+    _ConnResId,
+    ActionResId,
     #{
         client_id := ClientId,
         installed_bridge_v2s := Channels
-    } = _State
+    } = _ConnState
 ) ->
     %% Note: we must avoid returning `?status_disconnected' here. Returning
     %% `?status_disconnected' will make resource manager try to restart the producers /
     %% connector, thus potentially dropping data held in wolff producer's replayq.  The
     %% only exception is if the topic does not exist ("unhealthy target").
-    #{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels),
+    #{
+        topic := MKafkaTopic,
+        partitions_limit := MaxPartitions
+    } = maps:get(ActionResId, Channels),
     try
-        ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
+        ok = check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions),
         ?status_connected
     catch
         throw:{unhealthy_target, Msg} ->
@@ -552,22 +607,29 @@ on_get_channel_status(
             {?status_connecting, {K, E}}
     end.
 
-check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) ->
+check_topic_and_leader_connections(ActionResId, ClientId, MKafkaTopic, MaxPartitions) ->
     case wolff_client_sup:find_client(ClientId) of
         {ok, Pid} ->
-            ok = check_topic_status(ClientId, Pid, KafkaTopic),
-            ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions);
+            maybe
+                true ?= is_binary(MKafkaTopic),
+                ok = check_topic_status(ClientId, Pid, MKafkaTopic),
+                ok = check_if_healthy_leaders(
+                    ActionResId, ClientId, Pid, MKafkaTopic, MaxPartitions
+                )
+            else
+                false -> ok
+            end;
         {error, #{reason := no_such_client}} ->
             throw(#{
                 reason => cannot_find_kafka_client,
                 kafka_client => ClientId,
-                kafka_topic => KafkaTopic
+                kafka_topic => MKafkaTopic
             });
         {error, #{reason := client_supervisor_not_initialized}} ->
             throw(#{
                 reason => restarting,
                 kafka_client => ClientId,
-                kafka_topic => KafkaTopic
+                kafka_topic => MKafkaTopic
             })
     end.
 
@@ -612,8 +674,10 @@ error_summary(Map, [Error]) ->
 error_summary(Map, [Error | More]) ->
     Map#{first_error => Error, total_errors => length(More) + 1}.
 
-check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
-    case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
+check_if_healthy_leaders(ActionResId, ClientId, ClientPid, KafkaTopic, MaxPartitions) when
+    is_pid(ClientPid)
+->
+    case wolff_client:get_leader_connections(ClientPid, ActionResId, KafkaTopic, MaxPartitions) of
         {ok, Leaders} ->
             %% Kafka is considered healthy as long as any of the partition leader is reachable.
             case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of
@@ -675,7 +739,7 @@ ssl(#{enable := true} = SSL) ->
 ssl(_) ->
     false.
 
-producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
+producers_config(BridgeType, BridgeName, Input, IsDryRun, ActionResId) ->
     #{
         max_batch_bytes := MaxBatchBytes,
         compression := Compression,
@@ -717,8 +781,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
         max_batch_bytes => MaxBatchBytes,
         max_send_ahead => MaxInflight - 1,
         compression => Compression,
-        alias => BridgeV2Id,
-        telemetry_meta_data => #{bridge_id => BridgeV2Id},
+        group => ActionResId,
+        telemetry_meta_data => #{bridge_id => ActionResId},
         max_partitions => MaxPartitions
     }.
 
@@ -794,20 +858,19 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) ->
 %% Note: don't use the instance/manager ID, as that changes everytime
 %% the bridge is recreated, and will lead to multiplication of
 %% metrics.
--spec telemetry_handler_id(resource_id()) -> binary().
-telemetry_handler_id(ResourceID) ->
-    <<"emqx-bridge-kafka-producer-", ResourceID/binary>>.
+-spec telemetry_handler_id(action_resource_id()) -> binary().
+telemetry_handler_id(ActionResId) ->
+    ActionResId.
 
-uninstall_telemetry_handlers(ResourceID) ->
-    HandlerID = telemetry_handler_id(ResourceID),
-    telemetry:detach(HandlerID).
+uninstall_telemetry_handlers(TelemetryId) ->
+    telemetry:detach(TelemetryId).
 
-maybe_install_wolff_telemetry_handlers(ResourceID) ->
+maybe_install_wolff_telemetry_handlers(TelemetryId) ->
     %% Attach event handlers for Kafka telemetry events. If a handler with the
     %% handler id already exists, the attach_many function does nothing
     telemetry:attach_many(
         %% unique handler id
-        telemetry_handler_id(ResourceID),
+        telemetry_handler_id(TelemetryId),
         [
             [wolff, dropped_queue_full],
             [wolff, queuing],
@@ -819,7 +882,7 @@ maybe_install_wolff_telemetry_handlers(ResourceID) ->
         %% wolff producers; otherwise, multiple kafka producer bridges
         %% will install multiple handlers to the same wolff events,
         %% multiplying the metric counts...
-        #{bridge_id => ResourceID}
+        #{bridge_id => TelemetryId}
     ).
 
 preproc_kafka_headers(HeadersTmpl) when HeadersTmpl =:= <<>>; HeadersTmpl =:= undefined ->

+ 12 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_producer_action_info.erl

@@ -26,7 +26,12 @@ schema_module() -> emqx_bridge_kafka.
 connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
     BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
     BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
-    emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2).
+    BridgeV1Config = emqx_utils_maps:rename(<<"parameters">>, <<"kafka">>, BridgeV1Config2),
+    maps:update_with(
+        <<"kafka">>,
+        fun(Params) -> maps:with(v1_parameters(), Params) end,
+        BridgeV1Config
+    ).
 
 bridge_v1_config_to_action_config(BridgeV1Conf0 = #{<<"producer">> := _}, ConnectorName) ->
     %% Ancient v1 config, when `kafka' key was wrapped by `producer'
@@ -51,6 +56,12 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
 %% Internal helper functions
 %%------------------------------------------------------------------------------------------
 
+v1_parameters() ->
+    [
+        to_bin(K)
+     || {K, _} <- emqx_bridge_kafka:fields(v1_producer_kafka_opts)
+    ].
+
 producer_action_field_keys() ->
     [
         to_bin(K)

+ 3 - 3
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl

@@ -477,7 +477,7 @@ do_start_producer(KafkaClientId, KafkaTopic) ->
     ProducerConfig =
         #{
             name => Name,
-            partitioner => roundrobin,
+            partitioner => random,
             partition_count_refresh_interval_seconds => 1_000,
             replayq_max_total_bytes => 10_000,
             replayq_seg_bytes => 9_000,
@@ -1520,7 +1520,7 @@ t_receive_after_recovery(Config) ->
                     key => <<"commit", (integer_to_binary(N))/binary>>,
                     value => <<"commit", (integer_to_binary(N))/binary>>
                 }
-             || N <- lists:seq(1, NPartitions)
+             || N <- lists:seq(1, NPartitions * 10)
             ],
             %% we do distinct passes over this producing part so that
             %% wolff won't batch everything together.
@@ -1933,7 +1933,7 @@ t_node_joins_existing_cluster(Config) ->
                     Val = <<"v", (integer_to_binary(N))/binary>>,
                     publish(Config, KafkaTopic, [#{key => Key, value => Val}])
                 end,
-                lists:seq(1, NPartitions)
+                lists:seq(1, 10 * NPartitions)
             ),
             {ok, _} = snabbkaffe:receive_events(SRef1),
 

+ 162 - 6
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -23,6 +23,7 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("brod/include/brod.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("emqx/include/asserts.hrl").
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -165,6 +166,9 @@ send_message(Type, ActionName) ->
 
 resolve_kafka_offset() ->
     KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
+    resolve_kafka_offset(KafkaTopic).
+
+resolve_kafka_offset(KafkaTopic) ->
     Partition = 0,
     Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
     {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
@@ -174,11 +178,32 @@ resolve_kafka_offset() ->
 
 check_kafka_message_payload(Offset, ExpectedPayload) ->
     KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
+    check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload).
+
+check_kafka_message_payload(KafkaTopic, Offset, ExpectedPayload) ->
     Partition = 0,
     Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
     {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
     ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
 
+ensure_kafka_topic(KafkaTopic) ->
+    TopicConfigs = [
+        #{
+            name => KafkaTopic,
+            num_partitions => 1,
+            replication_factor => 1,
+            assignments => [],
+            configs => []
+        }
+    ],
+    RequestConfig = #{timeout => 5_000},
+    ConnConfig = #{},
+    Endpoints = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
+    case brod:create_topics(Endpoints, TopicConfigs, RequestConfig, ConnConfig) of
+        ok -> ok;
+        {error, topic_already_exists} -> ok
+    end.
+
 action_config(ConnectorName) ->
     action_config(ConnectorName, _Overrides = #{}).
 
@@ -768,9 +793,13 @@ t_invalid_partition_count_metrics(Config) ->
             %% Simulate `invalid_partition_count'
             emqx_common_test_helpers:with_mock(
                 wolff,
-                send_sync,
-                fun(_Producers, _Msgs, _Timeout) ->
-                    error({invalid_partition_count, 0, partitioner})
+                send_sync2,
+                fun(_Producers, _Topic, _Msgs, _Timeout) ->
+                    throw(#{
+                        cause => invalid_partition_count,
+                        count => 0,
+                        partitioner => partitioner
+                    })
                 end,
                 fun() ->
                     {{ok, _}, {ok, _}} =
@@ -813,9 +842,13 @@ t_invalid_partition_count_metrics(Config) ->
             %% Simulate `invalid_partition_count'
             emqx_common_test_helpers:with_mock(
                 wolff,
-                send,
-                fun(_Producers, _Msgs, _Timeout) ->
-                    error({invalid_partition_count, 0, partitioner})
+                send2,
+                fun(_Producers, _Topic, _Msgs, _AckCallback) ->
+                    throw(#{
+                        cause => invalid_partition_count,
+                        count => 0,
+                        partitioner => partitioner
+                    })
                 end,
                 fun() ->
                     {{ok, _}, {ok, _}} =
@@ -921,3 +954,126 @@ t_multiple_actions_sharing_topic(Config) ->
         end
     ),
     ok.
+
+%% Smoke tests for using a templated topic and adynamic kafka topics.
+t_dynamic_topics(Config) ->
+    Type = proplists:get_value(type, Config, ?TYPE),
+    ConnectorName = proplists:get_value(connector_name, Config, <<"c">>),
+    ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()),
+    ActionName = <<"dynamic_topics">>,
+    ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)),
+    PreConfiguredTopic1 = <<"pct1">>,
+    PreConfiguredTopic2 = <<"pct2">>,
+    ensure_kafka_topic(PreConfiguredTopic1),
+    ensure_kafka_topic(PreConfiguredTopic2),
+    ActionConfig = emqx_bridge_v2_testlib:parse_and_check(
+        action,
+        Type,
+        ActionName,
+        emqx_utils_maps:deep_merge(
+            ActionConfig1,
+            #{
+                <<"parameters">> => #{
+                    <<"topic">> => <<"pct${.payload.n}">>,
+                    <<"message">> => #{
+                        <<"key">> => <<"${.clientid}">>,
+                        <<"value">> => <<"${.payload.p}">>
+                    }
+                }
+            }
+        )
+    ),
+    ?check_trace(
+        #{timetrap => 7_000},
+        begin
+            ConnectorParams = [
+                {connector_config, ConnectorConfig},
+                {connector_name, ConnectorName},
+                {connector_type, Type}
+            ],
+            ActionParams = [
+                {action_config, ActionConfig},
+                {action_name, ActionName},
+                {action_type, Type}
+            ],
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_connector_api(ConnectorParams),
+
+            {ok, {{_, 201, _}, _, #{}}} =
+                emqx_bridge_v2_testlib:create_action_api(ActionParams),
+            RuleTopic = <<"pct">>,
+            {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http(
+                Type,
+                RuleTopic,
+                [
+                    {bridge_name, ActionName}
+                ]
+            ),
+            ?assertStatusAPI(Type, ActionName, <<"connected">>),
+
+            HandlerId = ?FUNCTION_NAME,
+            TestPid = self(),
+            telemetry:attach_many(
+                HandlerId,
+                emqx_resource_metrics:events(),
+                fun(EventName, Measurements, Metadata, _Config) ->
+                    Data = #{
+                        name => EventName,
+                        measurements => Measurements,
+                        metadata => Metadata
+                    },
+                    TestPid ! {telemetry, Data},
+                    ok
+                end,
+                unused_config
+            ),
+            on_exit(fun() -> telemetry:detach(HandlerId) end),
+
+            {ok, C} = emqtt:start_link(#{}),
+            {ok, _} = emqtt:connect(C),
+            Payload = fun(Map) -> emqx_utils_json:encode(Map) end,
+            Offset1 = resolve_kafka_offset(PreConfiguredTopic1),
+            Offset2 = resolve_kafka_offset(PreConfiguredTopic2),
+            {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 1, p => <<"p1">>}), [{qos, 1}]),
+            {ok, _} = emqtt:publish(C, RuleTopic, Payload(#{n => 2, p => <<"p2">>}), [{qos, 1}]),
+
+            check_kafka_message_payload(PreConfiguredTopic1, Offset1, <<"p1">>),
+            check_kafka_message_payload(PreConfiguredTopic2, Offset2, <<"p2">>),
+
+            ActionId = emqx_bridge_v2:id(Type, ActionName),
+            ?assertEqual(2, emqx_resource_metrics:matched_get(ActionId)),
+            ?assertEqual(2, emqx_resource_metrics:success_get(ActionId)),
+            ?assertEqual(0, emqx_resource_metrics:queuing_get(ActionId)),
+
+            ?assertReceive(
+                {telemetry, #{
+                    measurements := #{gauge_set := _},
+                    metadata := #{worker_id := _, resource_id := ActionId}
+                }}
+            ),
+
+            %% If there isn't enough information in the context to resolve to a topic, it
+            %% should be an unrecoverable error.
+            ?assertMatch(
+                {_, {ok, _}},
+                ?wait_async_action(
+                    emqtt:publish(C, RuleTopic, Payload(#{not_enough => <<"info">>}), [{qos, 1}]),
+                    #{?snk_kind := "kafka_producer_failed_to_render_topic"}
+                )
+            ),
+
+            %% If it's possible to render the topic, but it isn't in the pre-configured
+            %% list, it should be an unrecoverable error.
+            ?assertMatch(
+                {_, {ok, _}},
+                ?wait_async_action(
+                    emqtt:publish(C, RuleTopic, Payload(#{n => 99}), [{qos, 1}]),
+                    #{?snk_kind := "kafka_producer_resolved_to_unknown_topic"}
+                )
+            ),
+
+            ok
+        end,
+        []
+    ),
+    ok.

+ 2 - 1
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -83,7 +83,8 @@
     dropped_msg_due_to_mqueue_is_full,
     socket_receive_paused_by_rate_limit,
     data_bridge_buffer_overflow,
-    external_broker_crashed
+    external_broker_crashed,
+    unrecoverable_resource_error
 ]).
 
 -define(DEFAULT_RPC_PORT, 5369).

+ 1 - 1
apps/emqx_conf/src/emqx_conf_schema_inject.erl

@@ -63,7 +63,7 @@ authn_mods(ee) ->
     authn_mods(ce) ++
         [
             emqx_gcp_device_authn_schema,
-            emqx_authn_scram_http_schema
+            emqx_authn_scram_restapi_schema
         ].
 
 authz() ->

+ 1 - 1
apps/emqx_gateway/src/emqx_gateway_api_authn.erl

@@ -383,7 +383,7 @@ schema_authn() ->
     emqx_dashboard_swagger:schema_with_examples(
         emqx_authn_schema:authenticator_type_without([
             emqx_authn_scram_mnesia_schema,
-            emqx_authn_scram_http_schema
+            emqx_authn_scram_restapi_schema
         ]),
         emqx_authn_api:authenticator_examples()
     ).

+ 39 - 2
apps/emqx_plugins/src/emqx_plugins.erl

@@ -358,13 +358,13 @@ get_config_bin(NameVsn) ->
 %% RPC call from Management API or CLI.
 %% The plugin config Json Map was valid by avro schema
 %% Or: if no and plugin config ALWAYS be valid before calling this function.
-put_config(NameVsn, ConfigJsonMap, AvroValue) when not is_binary(NameVsn) ->
+put_config(NameVsn, ConfigJsonMap, AvroValue) when (not is_binary(NameVsn)) ->
     put_config(bin(NameVsn), ConfigJsonMap, AvroValue);
 put_config(NameVsn, ConfigJsonMap, _AvroValue) ->
     HoconBin = hocon_pp:do(ConfigJsonMap, #{}),
     ok = backup_and_write_hocon_bin(NameVsn, HoconBin),
-    %% TODO: callback in plugin's on_config_changed (config update by mgmt API)
     %% TODO: callback in plugin's on_config_upgraded (config vsn upgrade v1 -> v2)
+    ok = maybe_call_on_config_changed(NameVsn, ConfigJsonMap),
     ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), ConfigJsonMap),
     ok.
 
@@ -375,6 +375,43 @@ restart(NameVsn) ->
         {error, Reason} -> {error, Reason}
     end.
 
+%% @doc Call plugin's callback on_config_changed/2
+maybe_call_on_config_changed(NameVsn, NewConf) ->
+    FuncName = on_config_changed,
+    maybe
+        {ok, PluginAppModule} ?= app_module_name(NameVsn),
+        true ?= erlang:function_exported(PluginAppModule, FuncName, 2),
+        {ok, OldConf} = get_config(NameVsn),
+        try erlang:apply(PluginAppModule, FuncName, [OldConf, NewConf]) of
+            _ -> ok
+        catch
+            Class:CatchReason:Stacktrace ->
+                ?SLOG(error, #{
+                    msg => "failed_to_call_on_config_changed",
+                    exception => Class,
+                    reason => CatchReason,
+                    stacktrace => Stacktrace
+                }),
+                ok
+        end
+    else
+        {error, Reason} ->
+            ?SLOG(info, #{msg => "failed_to_call_on_config_changed", reason => Reason});
+        false ->
+            ?SLOG(info, #{msg => "on_config_changed_callback_not_exported"});
+        _ ->
+            ok
+    end.
+
+app_module_name(NameVsn) ->
+    case read_plugin_info(NameVsn, #{}) of
+        {ok, #{<<"name">> := Name} = _PluginInfo} ->
+            emqx_utils:safe_to_existing_atom(<<Name/binary, "_app">>);
+        {error, Reason} ->
+            ?SLOG(error, Reason#{msg => "failed_to_read_plugin_info"}),
+            {error, Reason}
+    end.
+
 %% @doc List all installed plugins.
 %% Including the ones that are installed, but not enabled in config.
 -spec list() -> [plugin_info()].

+ 54 - 24
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -298,10 +298,10 @@ running(info, {flush_metrics, _Ref}, _Data) ->
 running(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
     is_map_key(Pid, AsyncWorkers0)
 ->
-    ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}),
+    ?SLOG(info, #{msg => "async_worker_died", state => running, reason => Reason}, #{tag => ?TAG}),
     handle_async_worker_down(Data0, Pid);
 running(info, Info, _St) ->
-    ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}),
+    ?SLOG(error, #{msg => "unexpected_msg", state => running, info => Info}, #{tag => ?TAG}),
     keep_state_and_data.
 
 blocked(enter, _, #{resume_interval := ResumeT} = St0) ->
@@ -331,10 +331,10 @@ blocked(info, {flush_metrics, _Ref}, _Data) ->
 blocked(info, {'DOWN', _MRef, process, Pid, Reason}, Data0 = #{async_workers := AsyncWorkers0}) when
     is_map_key(Pid, AsyncWorkers0)
 ->
-    ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}),
+    ?SLOG(info, #{msg => "async_worker_died", state => blocked, reason => Reason}, #{tag => ?TAG}),
     handle_async_worker_down(Data0, Pid);
 blocked(info, Info, _Data) ->
-    ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}),
+    ?SLOG(error, #{msg => "unexpected_msg", state => blocked, info => Info}, #{tag => ?TAG}),
     keep_state_and_data.
 
 terminate(_Reason, #{id := Id, index := Index, queue := Q}) ->
@@ -981,7 +981,16 @@ handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) ->
         true ->
             PostFn =
                 fun() ->
-                    ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}),
+                    ?SLOG_THROTTLE(
+                        error,
+                        Id,
+                        #{
+                            resource_id => Id,
+                            msg => unrecoverable_resource_error,
+                            reason => Reason
+                        },
+                        #{tag => ?TAG}
+                    ),
                     ok
                 end,
             Counters =
@@ -1021,7 +1030,16 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCT
         true ->
             PostFn =
                 fun() ->
-                    ?SLOG(error, #{id => Id, msg => "unrecoverable_error", reason => Reason}),
+                    ?SLOG_THROTTLE(
+                        error,
+                        Id,
+                        #{
+                            resource_id => Id,
+                            msg => unrecoverable_resource_error,
+                            reason => Reason
+                        },
+                        #{tag => ?TAG}
+                    ),
                     ok
                 end,
             Counters =
@@ -1141,12 +1159,16 @@ log_expired_message_count(_Data = #{id := Id, index := Index, counters := Counte
         false ->
             ok;
         true ->
-            ?SLOG(info, #{
-                msg => "buffer_worker_dropped_expired_messages",
-                resource_id => Id,
-                worker_index => Index,
-                expired_count => ExpiredCount
-            }),
+            ?SLOG(
+                info,
+                #{
+                    msg => "buffer_worker_dropped_expired_messages",
+                    resource_id => Id,
+                    worker_index => Index,
+                    expired_count => ExpiredCount
+                },
+                #{tag => ?TAG}
+            ),
             ok
     end.
 
@@ -1556,7 +1578,7 @@ handle_async_reply1(
     case is_expired(ExpireAt, Now) of
         true ->
             IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
-            %% evalutate metrics call here since we're not inside
+            %% evaluate metrics call here since we're not inside
             %% buffer worker
             IsAcked andalso
                 begin
@@ -1797,12 +1819,16 @@ append_queue(Id, Index, Q, Queries) ->
                 ok = replayq:ack(Q1, QAckRef),
                 Dropped = length(Items2),
                 Counters = #{dropped_queue_full => Dropped},
-                ?SLOG_THROTTLE(warning, #{
-                    msg => data_bridge_buffer_overflow,
-                    resource_id => Id,
-                    worker_index => Index,
-                    dropped => Dropped
-                }),
+                ?SLOG_THROTTLE(
+                    warning,
+                    #{
+                        msg => data_bridge_buffer_overflow,
+                        resource_id => Id,
+                        worker_index => Index,
+                        dropped => Dropped
+                    },
+                    #{tag => ?TAG}
+                ),
                 {Items2, Q1, Counters}
         end,
     ?tp(
@@ -2236,11 +2262,15 @@ adjust_batch_time(Id, RequestTTL, BatchTime0) ->
     BatchTime = max(0, min(BatchTime0, RequestTTL div 2)),
     case BatchTime =:= BatchTime0 of
         false ->
-            ?SLOG(info, #{
-                id => Id,
-                msg => "adjusting_buffer_worker_batch_time",
-                new_batch_time => BatchTime
-            });
+            ?SLOG(
+                info,
+                #{
+                    resource_id => Id,
+                    msg => "adjusting_buffer_worker_batch_time",
+                    new_batch_time => BatchTime
+                },
+                #{tag => ?TAG}
+            );
         true ->
             ok
     end,

+ 5 - 3
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -115,11 +115,13 @@ test(#{sql := Sql, context := Context}) ->
                 true ->
                     %% test if the topic matches the topic filters in the rule
                     case emqx_topic:match_any(InTopic, EventTopics) of
-                        true -> test_rule(Sql, Select, Context, EventTopics);
-                        false -> {error, nomatch}
+                        true ->
+                            test_rule(Sql, Select, Context, EventTopics);
+                        false ->
+                            {error, nomatch}
                     end;
                 false ->
-                    case lists:member(InTopic, EventTopics) of
+                    case emqx_topic:match_any(InTopic, EventTopics) of
                         true ->
                             %% the rule is for both publish and events, test it directly
                             test_rule(Sql, Select, Context, EventTopics);

+ 33 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_api_2_SUITE.erl

@@ -393,6 +393,38 @@ t_rule_test_smoke(_Config) ->
         }
     ],
     MultipleFrom = [
+        #{
+            expected => #{code => 200},
+            input =>
+                #{
+                    <<"context">> =>
+                        #{
+                            <<"clientid">> => <<"c_emqx">>,
+                            <<"event_type">> => <<"message_publish">>,
+                            <<"qos">> => 1,
+                            <<"topic">> => <<"t/a">>,
+                            <<"username">> => <<"u_emqx">>
+                        },
+                    <<"sql">> =>
+                        <<"SELECT\n  *\nFROM\n  \"t/#\", \"$bridges/mqtt:source\" ">>
+                }
+        },
+        #{
+            expected => #{code => 200},
+            input =>
+                #{
+                    <<"context">> =>
+                        #{
+                            <<"clientid">> => <<"c_emqx">>,
+                            <<"event_type">> => <<"message_publish">>,
+                            <<"qos">> => 1,
+                            <<"topic">> => <<"t/a">>,
+                            <<"username">> => <<"u_emqx">>
+                        },
+                    <<"sql">> =>
+                        <<"SELECT\n  *\nFROM\n  \"t/#\", \"$sources/mqtt:source\" ">>
+                }
+        },
         #{
             expected => #{code => 200},
             input =>
@@ -488,6 +520,7 @@ do_t_rule_test_smoke(#{input := Input, expected := #{code := ExpectedCode}} = Ca
             {true, #{
                 expected => ExpectedCode,
                 hint => maps:get(hint, Case, <<>>),
+                input => Input,
                 got => Code,
                 resp_body => Body
             }}

+ 5 - 7
apps/emqx_utils/src/emqx_utils_scram.erl

@@ -16,17 +16,17 @@
 
 -module(emqx_utils_scram).
 
--export([authenticate/6]).
+-export([authenticate/7]).
 
 %%------------------------------------------------------------------------------
 %% Authentication
 %%------------------------------------------------------------------------------
-authenticate(AuthMethod, AuthData, AuthCache, RetrieveFun, OnErrFun, Conf) ->
+authenticate(AuthMethod, AuthData, AuthCache, Conf, RetrieveFun, OnErrFun, ResultKeys) ->
     case ensure_auth_method(AuthMethod, AuthData, Conf) of
         true ->
             case AuthCache of
                 #{next_step := client_final} ->
-                    check_client_final_message(AuthData, AuthCache, Conf, OnErrFun);
+                    check_client_final_message(AuthData, AuthCache, Conf, OnErrFun, ResultKeys);
                 _ ->
                     check_client_first_message(AuthData, AuthCache, Conf, RetrieveFun, OnErrFun)
             end;
@@ -64,9 +64,7 @@ check_client_first_message(
             {error, not_authorized}
     end.
 
-check_client_final_message(
-    Bin, #{is_superuser := IsSuperuser} = Cache, #{algorithm := Alg}, OnErrFun
-) ->
+check_client_final_message(Bin, Cache, #{algorithm := Alg}, OnErrFun, ResultKeys) ->
     case
         esasl_scram:check_client_final_message(
             Bin,
@@ -74,7 +72,7 @@ check_client_final_message(
         )
     of
         {ok, ServerFinalMessage} ->
-            {ok, #{is_superuser => IsSuperuser}, ServerFinalMessage};
+            {ok, maps:with(ResultKeys, Cache), ServerFinalMessage};
         {error, Reason} ->
             OnErrFun("check_client_final_message_error", Reason),
             {error, not_authorized}

+ 1 - 1
apps/emqx_utils/src/emqx_variform_bif.erl

@@ -583,7 +583,7 @@ getenv(Bin) when is_binary(Bin) ->
     EnvKey = ?ENV_CACHE(Bin),
     case persistent_term:get(EnvKey, undefined) of
         undefined ->
-            Name = erlang:binary_to_list(Bin),
+            Name = "EMQXVAR_" ++ erlang:binary_to_list(Bin),
             Result =
                 case os:getenv(Name) of
                     false ->

+ 1 - 1
apps/emqx_utils/test/emqx_variform_bif_tests.erl

@@ -77,5 +77,5 @@ system_test() ->
     EnvName = erlang:atom_to_list(?MODULE),
     EnvVal = erlang:atom_to_list(?FUNCTION_NAME),
     EnvNameBin = erlang:list_to_binary(EnvName),
-    os:putenv(EnvName, EnvVal),
+    os:putenv("EMQXVAR_" ++ EnvName, EnvVal),
     ?assertEqual(erlang:list_to_binary(EnvVal), emqx_variform_bif:getenv(EnvNameBin)).

+ 4 - 2
changes/ce/feat-13507.en.md

@@ -1,2 +1,4 @@
-Added a new builtin function `getenv` in the rule engine and variform expression to access the environment variables.
-Note this value is immutable once loaded from the environment.
+Added a new builtin function `getenv` in the rule engine and variform expression to access the environment variables with below limitations.
+
+- Prefix `EMQXVAR_` is added before reading from OS environment variables. i.e. `getenv('FOO_BAR')` is to read `EMQXVAR_FOO_BAR`.
+- The values are immutable once loaded from the OS environment.

+ 1 - 0
changes/ce/feat-13528.en.md

@@ -0,0 +1 @@
+Add log throttling for data integration unrecoverable errors.

+ 6 - 0
changes/ce/feat-13548.en.md

@@ -0,0 +1,6 @@
+Optionally calls the `on_config_changed/2` callback function when the plugin configuration is updated via the REST API.
+
+This callback function is assumed to be exported by the `<PluginName>_app` module.
+i.e:
+Plugin NameVsn: `my_plugin-1.0.0`
+This callback function is assumed to be `my_plugin_app:on_config_changed/2`

+ 1 - 0
changes/ce/fix-13527.en.md

@@ -0,0 +1 @@
+Fixed an issue where running a SQL test in Rule Engine for the Message Publish event when a `$bridges/...` source was included in the `FROM` clause would always yield no results.

+ 5 - 0
changes/ee/feat-13452.en.md

@@ -0,0 +1,5 @@
+Kafka producer action's `topic` config now supports templates.
+
+The topics must be already created in Kafka. If a message is rendered towards a non-existing topic in Kafka (given Kafka disabled topic auto-creation), the message will fail with an unrecoverable error.  Also, if a message does not contain enough information to render to the configured template (e.g.: the template is `t-${t}` and the message context does not define `t`), this message will also fail with an unrecoverable error. 
+
+This same feature is also available for Azure Event Hubs and Confluent Platform producer integrations.

+ 4 - 0
changes/ee/feat-13504.en.md

@@ -1 +1,5 @@
 Added a HTTP backend for the authentication mechanism `scram`.
+
+Note: This is not an implementation of the RFC 7804: Salted Challenge Response HTTP Authentication Mechanism.
+
+This backend is an implementation of scram that uses an external web resource as a source of SCRAM authentication data, including stored key of the client, server key, and the salt. It support other authentication and authorization extension fields like HTTP auth backend, namely: `is_superuser`, `client_attrs`, `expire_at` and `acl`.

+ 1 - 1
mix.exs

@@ -387,7 +387,7 @@ defmodule EMQXUmbrella.MixProject do
       {:hstreamdb_erl,
        github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
       {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
-      {:wolff, github: "kafka4beam/wolff", tag: "2.0.0"},
+      {:wolff, github: "kafka4beam/wolff", tag: "3.0.2"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
       {:brod, github: "kafka4beam/brod", tag: "3.18.0"},

+ 1 - 1
rel/i18n/emqx_bridge_azure_event_hub.hocon

@@ -69,7 +69,7 @@ producer_kafka_opts.label:
 """Azure Event Hubs Producer"""
 
 kafka_topic.desc:
-"""Event Hubs name"""
+"""Event Hubs name.  Supports templates (e.g.: `t-${payload.t}`)."""
 
 kafka_topic.label:
 """Event Hubs Name"""

+ 2 - 2
rel/i18n/emqx_bridge_confluent_producer.hocon

@@ -69,10 +69,10 @@ producer_kafka_opts.label:
 """Confluent Producer"""
 
 kafka_topic.desc:
-"""Event Hub name"""
+"""Kafka topic name.  Supports templates (e.g.: `t-${payload.t}`)."""
 
 kafka_topic.label:
-"""Event Hub Name"""
+"""Kafka Topic Name"""
 
 kafka_message_timestamp.desc:
 """Which timestamp to use. The timestamp is expected to be a millisecond precision Unix epoch which can be in string format, e.g. <code>1661326462115</code> or <code>'1661326462115'</code>. When the desired data field for this template is not found, or if the found data is not a valid integer, the current system timestamp will be used."""

+ 1 - 1
rel/i18n/emqx_bridge_kafka.hocon

@@ -81,7 +81,7 @@ producer_kafka_opts.label:
 """Kafka Producer"""
 
 kafka_topic.desc:
-"""Kafka topic name"""
+"""Kafka topic name.  Supports templates (e.g.: `t-${payload.t}`)."""
 
 kafka_topic.label:
 """Kafka Topic Name"""