소스 검색

Merge remote-tracking branch 'upstream/release-57' into 20240717-sync-release-57

Ivan Dyachkov 1 년 전
부모
커밋
292b331064
87개의 변경된 파일1574개의 추가작업 그리고 302개의 파일을 삭제
  1. 2 2
      apps/emqx/rebar.config
  2. 147 5
      apps/emqx/src/emqx_banned.erl
  3. 19 6
      apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
  4. 5 1
      apps/emqx/src/emqx_passwd.erl
  5. 21 1
      apps/emqx/src/emqx_schema.erl
  6. 4 0
      apps/emqx/test/data/banned/error.csv
  7. 3 0
      apps/emqx/test/data/banned/full.csv
  8. 3 0
      apps/emqx/test/data/banned/full2.csv
  9. 3 0
      apps/emqx/test/data/banned/omitted.csv
  10. 3 0
      apps/emqx/test/data/banned/optional.csv
  11. 53 0
      apps/emqx/test/emqx_banned_SUITE.erl
  12. 15 1
      apps/emqx/test/emqx_passwd_SUITE.erl
  13. 2 2
      apps/emqx/test/emqx_ratelimiter_SUITE.erl
  14. 8 1
      apps/emqx_auth/src/emqx_authz/emqx_authz_api_sources.erl
  15. 2 1
      apps/emqx_auth/src/emqx_authz/emqx_authz_rule.erl
  16. 22 1
      apps/emqx_auth/src/emqx_authz/emqx_authz_rule_raw.erl
  17. 1 0
      apps/emqx_auth/src/emqx_authz/emqx_authz_schema.erl
  18. 3 0
      apps/emqx_auth/test/data/bad_public_key_file.pem
  19. 1 1
      apps/emqx_auth_http/src/emqx_auth_http.app.src
  20. 86 13
      apps/emqx_auth_http/src/emqx_authn_http.erl
  21. 163 0
      apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl
  22. 44 3
      apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl
  23. 14 1
      apps/emqx_auth_jwt/src/emqx_authn_jwks_client.erl
  24. 49 35
      apps/emqx_auth_jwt/src/emqx_authn_jwt.erl
  25. 32 0
      apps/emqx_auth_jwt/src/emqx_authn_jwt_schema.erl
  26. 161 1
      apps/emqx_auth_jwt/test/emqx_authn_jwt_SUITE.erl
  27. 1 1
      apps/emqx_bridge/test/emqx_bridge_SUITE.erl
  28. 2 2
      apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
  29. 3 4
      apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl
  30. 3 2
      apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl
  31. 1 1
      apps/emqx_connector/test/emqx_connector_SUITE.erl
  32. 1 1
      apps/emqx_connector/test/emqx_connector_api_SUITE.erl
  33. 6 2
      apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl
  34. 6 1
      apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_session.erl
  35. 46 6
      apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl
  36. 18 1
      apps/emqx_gateway/src/emqx_gateway_schema.erl
  37. 8 2
      apps/emqx_gateway/src/emqx_gateway_utils.erl
  38. 1 0
      apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl
  39. 4 2
      apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl
  40. 10 31
      apps/emqx_gateway_coap/src/emqx_coap_channel.erl
  41. 67 0
      apps/emqx_gateway_coap/src/emqx_coap_proxy_conn.erl
  42. 14 3
      apps/emqx_gateway_coap/src/emqx_gateway_coap.erl
  43. 22 12
      apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl
  44. 2 1
      apps/emqx_gateway_coap/test/emqx_coap_api_SUITE.erl
  45. 1 1
      apps/emqx_management/src/emqx_mgmt_api_banned.erl
  46. 29 0
      apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl
  47. 8 0
      apps/emqx_management/test/emqx_mgmt_api_test_util.erl
  48. 24 28
      apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl
  49. 2 0
      apps/emqx_message_transformation/src/emqx_message_transformation_schema.erl
  50. 13 0
      apps/emqx_message_transformation/test/emqx_message_transformation_tests.erl
  51. 14 3
      apps/emqx_plugins/test/emqx_plugins_SUITE.erl
  52. 0 1
      apps/emqx_prometheus/.gitignore
  53. 15 17
      apps/emqx_prometheus/src/emqx_prometheus.erl
  54. 21 0
      apps/emqx_prometheus/test/data/cert.crt
  55. 14 0
      apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl
  56. 122 60
      apps/emqx_resource/src/emqx_resource_manager.erl
  57. 21 9
      apps/emqx_schema_registry/src/emqx_schema_registry.erl
  58. 9 1
      apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl
  59. 1 1
      apps/emqx_schema_validation/src/emqx_schema_validation.app.src
  60. 2 0
      apps/emqx_schema_validation/src/emqx_schema_validation_schema.erl
  61. 13 0
      apps/emqx_schema_validation/test/emqx_schema_validation_tests.erl
  62. 3 1
      apps/emqx_utils/src/emqx_utils.erl
  63. 54 5
      apps/emqx_utils/src/emqx_utils_stream.erl
  64. 16 0
      apps/emqx_utils/test/emqx_utils_SUITE.erl
  65. 1 0
      changes/ce/feat-13436.en.md
  66. 1 0
      changes/ce/fix-13375.en.md
  67. 1 0
      changes/ce/fix-13398.en.md
  68. 1 0
      changes/ce/fix-13403.en.md
  69. 1 0
      changes/ce/fix-13408.en.md
  70. 1 0
      changes/ce/fix-13419.en.md
  71. 1 0
      changes/ce/fix-13422.en.md
  72. 1 0
      changes/ce/fix-13442.en.md
  73. 1 0
      changes/ce/perf-13441.en.md
  74. 1 0
      changes/ee/breaking-13420.en.md
  75. 17 0
      changes/ee/feat-13386.en.md
  76. 1 0
      changes/ee/fix-13420.en.md
  77. 1 0
      changes/fix-13412.en.md
  78. 1 0
      changes/fix-13432.en.md
  79. 3 3
      deploy/docker/Dockerfile
  80. 2 2
      mix.exs
  81. 2 2
      rebar.config
  82. 5 0
      rel/i18n/emqx_authn_jwt_schema.hocon
  83. 5 0
      rel/i18n/emqx_authz_schema.hocon
  84. 9 0
      rel/i18n/emqx_gateway_schema.hocon
  85. 19 1
      rel/i18n/emqx_schema.hocon
  86. 34 18
      scripts/ui-tests/dashboard_test.py
  87. 2 2
      scripts/ui-tests/docker-compose.yaml

+ 2 - 2
apps/emqx/rebar.config

@@ -28,10 +28,10 @@
     {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
-    {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
+    {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}},
     {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
-    {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.2"}}},
+    {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.1"}}},
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},

+ 147 - 5
apps/emqx/src/emqx_banned.erl

@@ -16,6 +16,8 @@
 
 -module(emqx_banned).
 
+-feature(maybe_expr, enable).
+
 -behaviour(gen_server).
 -behaviour(emqx_db_backup).
 
@@ -49,6 +51,7 @@
     handle_call/3,
     handle_cast/2,
     handle_info/2,
+    handle_continue/2,
     terminate/2,
     code_change/3
 ]).
@@ -137,7 +140,7 @@ format(#banned{
         until => to_rfc3339(Until)
     }.
 
--spec parse(map()) -> emqx_types:banned() | {error, term()}.
+-spec parse(map()) -> {ok, emqx_types:banned()} | {error, term()}.
 parse(Params) ->
     case parse_who(Params) of
         {error, Reason} ->
@@ -149,13 +152,13 @@ parse(Params) ->
             Until = maps:get(<<"until">>, Params, At + ?EXPIRATION_TIME),
             case Until > erlang:system_time(second) of
                 true ->
-                    #banned{
+                    {ok, #banned{
                         who = Who,
                         by = By,
                         reason = Reason,
                         at = At,
                         until = Until
-                    };
+                    }};
                 false ->
                     ErrorReason =
                         io_lib:format("Cannot create expired banned, ~p to ~p", [At, Until]),
@@ -239,12 +242,139 @@ who(peerhost_net, CIDR) when is_tuple(CIDR) -> {peerhost_net, CIDR};
 who(peerhost_net, CIDR) when is_binary(CIDR) ->
     {peerhost_net, esockd_cidr:parse(binary_to_list(CIDR), true)}.
 
+%%--------------------------------------------------------------------
+%% Import From CSV
+%%--------------------------------------------------------------------
+init_from_csv(undefined) ->
+    ok;
+init_from_csv(File) ->
+    maybe
+        core ?= mria_rlog:role(),
+        '$end_of_table' ?= mnesia:dirty_first(?BANNED_RULE_TAB),
+        '$end_of_table' ?= mnesia:dirty_first(?BANNED_INDIVIDUAL_TAB),
+        {ok, Bin} ?= file:read_file(File),
+        Stream = emqx_utils_stream:csv(Bin, #{nullable => true, filter_null => true}),
+        {ok, List} ?= parse_stream(Stream),
+        import_from_stream(List),
+        ?SLOG(info, #{
+            msg => "load_banned_bootstrap_file_succeeded",
+            file => File
+        })
+    else
+        replicant ->
+            ok;
+        {Name, _} when
+            Name == peerhost;
+            Name == peerhost_net;
+            Name == clientid_re;
+            Name == username_re;
+            Name == clientid;
+            Name == username
+        ->
+            ok;
+        {error, Reason} = Error ->
+            ?SLOG(error, #{
+                msg => "load_banned_bootstrap_file_failed",
+                reason => Reason,
+                file => File
+            }),
+            Error
+    end.
+
+import_from_stream(Stream) ->
+    Groups = maps:groups_from_list(
+        fun(#banned{who = Who}) -> table(Who) end, Stream
+    ),
+    maps:foreach(
+        fun(Tab, Items) ->
+            Trans = fun() ->
+                lists:foreach(
+                    fun(Item) ->
+                        mnesia:write(Tab, Item, write)
+                    end,
+                    Items
+                )
+            end,
+
+            case trans(Trans) of
+                {ok, _} ->
+                    ?SLOG(info, #{
+                        msg => "import_banned_from_stream_succeeded",
+                        items => Items
+                    });
+                {error, Reason} ->
+                    ?SLOG(error, #{
+                        msg => "import_banned_from_stream_failed",
+                        reason => Reason,
+                        items => Items
+                    })
+            end
+        end,
+        Groups
+    ).
+
+parse_stream(Stream) ->
+    try
+        List = emqx_utils_stream:consume(Stream),
+        parse_stream(List, [], [])
+    catch
+        error:Reason ->
+            {error, Reason}
+    end.
+
+parse_stream([Item | List], Ok, Error) ->
+    maybe
+        {ok, Item1} ?= normalize_parse_item(Item),
+        {ok, Banned} ?= parse(Item1),
+        parse_stream(List, [Banned | Ok], Error)
+    else
+        {error, _} ->
+            parse_stream(List, Ok, [Item | Error])
+    end;
+parse_stream([], Ok, []) ->
+    {ok, Ok};
+parse_stream([], Ok, Error) ->
+    ?SLOG(warning, #{
+        msg => "invalid_banned_items",
+        items => Error
+    }),
+    {ok, Ok}.
+
+normalize_parse_item(#{<<"as">> := As} = Item) ->
+    ParseTime = fun(Name, Input) ->
+        maybe
+            #{Name := Time} ?= Input,
+            {ok, Epoch} ?= emqx_utils_calendar:to_epoch_second(emqx_utils_conv:str(Time)),
+            {ok, Input#{Name := Epoch}}
+        else
+            {error, _} = Error ->
+                Error;
+            NoTime when is_map(NoTime) ->
+                {ok, NoTime}
+        end
+    end,
+
+    maybe
+        {ok, Type} ?= emqx_utils:safe_to_existing_atom(As),
+        {ok, Item1} ?= ParseTime(<<"at">>, Item#{<<"as">> := Type}),
+        ParseTime(<<"until">>, Item1)
+    end;
+normalize_parse_item(_Item) ->
+    {error, invalid_item}.
+
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %%--------------------------------------------------------------------
 
 init([]) ->
-    {ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
+    {ok, ensure_expiry_timer(#{expiry_timer => undefined}), {continue, init_from_csv}}.
+
+handle_continue(init_from_csv, State) ->
+    File = emqx_schema:naive_env_interpolation(
+        emqx:get_config([banned, bootstrap_file], undefined)
+    ),
+    _ = init_from_csv(File),
+    {noreply, State}.
 
 handle_call(Req, _From, State) ->
     ?SLOG(error, #{msg => "unexpected_call", call => Req}),
@@ -255,7 +385,7 @@ handle_cast(Msg, State) ->
     {noreply, State}.
 
 handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
-    _ = mria:transaction(?COMMON_SHARD, fun ?MODULE:expire_banned_items/1, [
+    _ = trans(fun ?MODULE:expire_banned_items/1, [
         erlang:system_time(second)
     ]),
     {noreply, ensure_expiry_timer(State), hibernate};
@@ -396,3 +526,15 @@ on_banned(_) ->
 
 all_rules() ->
     ets:tab2list(?BANNED_RULE_TAB).
+
+trans(Fun) ->
+    case mria:transaction(?COMMON_SHARD, Fun) of
+        {atomic, Res} -> {ok, Res};
+        {aborted, Reason} -> {error, Reason}
+    end.
+
+trans(Fun, Args) ->
+    case mria:transaction(?COMMON_SHARD, Fun, Args) of
+        {atomic, Res} -> {ok, Res};
+        {aborted, Reason} -> {error, Reason}
+    end.

+ 19 - 6
apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl

@@ -212,16 +212,29 @@ short_paths_fields() ->
 short_paths_fields(Importance) ->
     [
         {Name,
-            ?HOCON(rate_type(), #{
-                desc => ?DESC(Name),
-                required => false,
-                importance => Importance,
-                example => Example
-            })}
+            ?HOCON(
+                rate_type(),
+                maps:merge(
+                    #{
+                        desc => ?DESC(Name),
+                        required => false,
+                        importance => Importance,
+                        example => Example
+                    },
+                    short_paths_fields_extra(Name)
+                )
+            )}
      || {Name, Example} <-
             lists:zip(short_paths(), [<<"1000/s">>, <<"1000/s">>, <<"100MB/s">>])
     ].
 
+short_paths_fields_extra(max_conn_rate) ->
+    #{
+        default => infinity
+    };
+short_paths_fields_extra(_Name) ->
+    #{}.
+
 desc(limiter) ->
     "Settings for the rate limiter.";
 desc(node_opts) ->

+ 5 - 1
apps/emqx/src/emqx_passwd.erl

@@ -102,7 +102,11 @@ hash({SimpleHash, _Salt, disable}, Password) when is_binary(Password) ->
 hash({SimpleHash, Salt, prefix}, Password) when is_binary(Password), is_binary(Salt) ->
     hash_data(SimpleHash, <<Salt/binary, Password/binary>>);
 hash({SimpleHash, Salt, suffix}, Password) when is_binary(Password), is_binary(Salt) ->
-    hash_data(SimpleHash, <<Password/binary, Salt/binary>>).
+    hash_data(SimpleHash, <<Password/binary, Salt/binary>>);
+hash({_SimpleHash, Salt, _SaltPos}, _Password) when not is_binary(Salt) ->
+    error({salt_not_string, Salt});
+hash({_SimpleHash, _Salt, _SaltPos}, Password) when not is_binary(Password) ->
+    error({password_not_string, Password}).
 
 -spec hash_data(hash_type(), binary()) -> binary().
 hash_data(plain, Data) when is_binary(Data) ->

+ 21 - 1
apps/emqx/src/emqx_schema.erl

@@ -63,6 +63,7 @@
 -type json_binary() :: binary().
 -type template() :: binary().
 -type template_str() :: string().
+-type binary_kv() :: #{binary() => binary()}.
 
 -typerefl_from_string({duration/0, emqx_schema, to_duration}).
 -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
@@ -167,7 +168,8 @@
     json_binary/0,
     port_number/0,
     template/0,
-    template_str/0
+    template_str/0,
+    binary_kv/0
 ]).
 
 -export([namespace/0, roots/0, roots/1, fields/1, desc/1, tags/0]).
@@ -319,6 +321,11 @@ roots(low) ->
             sc(
                 ref("crl_cache"),
                 #{importance => ?IMPORTANCE_HIDDEN}
+            )},
+        {banned,
+            sc(
+                ref("banned"),
+                #{importance => ?IMPORTANCE_HIDDEN}
             )}
     ].
 
@@ -1762,6 +1769,17 @@ fields("client_attrs_init") ->
                 desc => ?DESC("client_attrs_init_set_as_attr"),
                 validator => fun restricted_string/1
             })}
+    ];
+fields("banned") ->
+    [
+        {bootstrap_file,
+            sc(
+                binary(),
+                #{
+                    desc => ?DESC("banned_bootstrap_file"),
+                    require => false
+                }
+            )}
     ].
 
 compile_variform(undefined, _Opts) ->
@@ -2101,6 +2119,8 @@ desc(durable_storage) ->
     ?DESC(durable_storage);
 desc("client_attrs_init") ->
     ?DESC(client_attrs_init);
+desc("banned") ->
+    "Banned .";
 desc(_) ->
     undefined.
 

+ 4 - 0
apps/emqx/test/data/banned/error.csv

@@ -0,0 +1,4 @@
+as,who,reason,at,until,by
+clientid,c1,right,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
+username,u1,reason 1,abc,2025-10-25T21:53:47+08:00,boot
+usernamx,u2,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot

+ 3 - 0
apps/emqx/test/data/banned/full.csv

@@ -0,0 +1,3 @@
+as,who,reason,at,until,by
+clientid,c1,reason 1,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
+username,u1,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot

+ 3 - 0
apps/emqx/test/data/banned/full2.csv

@@ -0,0 +1,3 @@
+as,who,reason,at,until,by
+clientid,c2,reason 1,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
+username,u2,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot

+ 3 - 0
apps/emqx/test/data/banned/omitted.csv

@@ -0,0 +1,3 @@
+as,who,reason,at,until,by
+clientid,c1,,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,
+username,u1,,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,

+ 3 - 0
apps/emqx/test/data/banned/optional.csv

@@ -0,0 +1,3 @@
+as,who
+clientid,c1
+username,u1

+ 53 - 0
apps/emqx/test/emqx_banned_SUITE.erl

@@ -254,6 +254,45 @@ t_session_taken(_) ->
     {ok, #{}, [0]} = emqtt:unsubscribe(C3, Topic),
     ok = emqtt:disconnect(C3).
 
+t_full_bootstrap_file(_) ->
+    emqx_banned:clear(),
+    ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"full.csv">>))),
+    FullDatas = lists:sort([
+        {banned, {username, <<"u1">>}, <<"boot">>, <<"reason 2">>, 1635170027, 1761400427},
+        {banned, {clientid, <<"c1">>}, <<"boot">>, <<"reason 1">>, 1635170027, 1761400427}
+    ]),
+    ?assertMatch(FullDatas, lists:sort(get_banned_list())),
+
+    ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"full2.csv">>))),
+    ?assertMatch(FullDatas, lists:sort(get_banned_list())),
+    ok.
+
+t_optional_bootstrap_file(_) ->
+    emqx_banned:clear(),
+    ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"optional.csv">>))),
+    Keys = lists:sort([{username, <<"u1">>}, {clientid, <<"c1">>}]),
+    ?assertMatch(Keys, lists:sort([element(2, Data) || Data <- get_banned_list()])),
+    ok.
+
+t_omitted_bootstrap_file(_) ->
+    emqx_banned:clear(),
+    ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"omitted.csv">>))),
+    Keys = lists:sort([{username, <<"u1">>}, {clientid, <<"c1">>}]),
+    ?assertMatch(Keys, lists:sort([element(2, Data) || Data <- get_banned_list()])),
+    ok.
+
+t_error_bootstrap_file(_) ->
+    emqx_banned:clear(),
+    ?assertEqual(
+        {error, enoent}, emqx_banned:init_from_csv(mk_bootstrap_file(<<"not_exists.csv">>))
+    ),
+    ?assertEqual(
+        ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"error.csv">>))
+    ),
+    Keys = [{clientid, <<"c1">>}],
+    ?assertMatch(Keys, [element(2, Data) || Data <- get_banned_list()]),
+    ok.
+
 receive_messages(Count) ->
     receive_messages(Count, []).
 receive_messages(0, Msgs) ->
@@ -269,3 +308,17 @@ receive_messages(Count, Msgs) ->
     after 1200 ->
         Msgs
     end.
+
+mk_bootstrap_file(File) ->
+    Dir = code:lib_dir(emqx, test),
+    filename:join([Dir, <<"data/banned">>, File]).
+
+get_banned_list() ->
+    Tabs = emqx_banned:tables(),
+    lists:foldl(
+        fun(Tab, Acc) ->
+            Acc ++ ets:tab2list(Tab)
+        end,
+        [],
+        Tabs
+    ).

+ 15 - 1
apps/emqx/test/emqx_passwd_SUITE.erl

@@ -124,4 +124,18 @@ t_hash(_) ->
     false = emqx_passwd:check_pass({pbkdf2, sha, Pbkdf2Salt, 2, BadDKlen}, Pbkdf2, Password),
 
     %% Invalid derived_length, pbkdf2 fails
-    ?assertException(error, _, emqx_passwd:hash({pbkdf2, sha, Pbkdf2Salt, 2, BadDKlen}, Password)).
+    ?assertException(error, _, emqx_passwd:hash({pbkdf2, sha, Pbkdf2Salt, 2, BadDKlen}, Password)),
+
+    %% invalid salt (not binary)
+    ?assertException(
+        error,
+        {salt_not_string, false},
+        emqx_passwd:hash({sha256, false, suffix}, Password)
+    ),
+
+    %% invalid password (not binary)
+    ?assertException(
+        error,
+        {password_not_string, bad_password_type},
+        emqx_passwd:hash({sha256, Salt, suffix}, bad_password_type)
+    ).

+ 2 - 2
apps/emqx/test/emqx_ratelimiter_SUITE.erl

@@ -816,8 +816,8 @@ t_no_limiter_for_listener(_) ->
     CfgStr = <<>>,
     ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
     ListenerOpt = emqx:get_config([listeners, tcp, default]),
-    ?assertEqual(
-        undefined,
+    ?assertMatch(
+        #{connection := #{rate := infinity}},
         emqx_limiter_utils:get_listener_opts(ListenerOpt)
     ).
 

+ 8 - 1
apps/emqx_auth/src/emqx_authz/emqx_authz_api_sources.erl

@@ -470,7 +470,13 @@ make_result_map(ResList) ->
     lists:foldl(Fun, {maps:new(), maps:new(), maps:new(), maps:new()}, ResList).
 
 restructure_map(#{
-    counters := #{deny := Failed, total := Total, allow := Succ, nomatch := Nomatch},
+    counters := #{
+        ignore := Ignore,
+        deny := Failed,
+        total := Total,
+        allow := Succ,
+        nomatch := Nomatch
+    },
     rate := #{total := #{current := Rate, last5m := Rate5m, max := RateMax}}
 }) ->
     #{
@@ -478,6 +484,7 @@ restructure_map(#{
         allow => Succ,
         deny => Failed,
         nomatch => Nomatch,
+        ignore => Ignore,
         rate => Rate,
         rate_last5m => Rate5m,
         rate_max => RateMax

+ 2 - 1
apps/emqx_auth/src/emqx_authz/emqx_authz_rule.erl

@@ -68,7 +68,8 @@
 -export_type([
     permission_resolution/0,
     action_condition/0,
-    topic_condition/0
+    topic_condition/0,
+    rule/0
 ]).
 
 %%--------------------------------------------------------------------

+ 22 - 1
apps/emqx_auth/src/emqx_authz/emqx_authz_rule_raw.erl

@@ -21,7 +21,7 @@
 
 -module(emqx_authz_rule_raw).
 
--export([parse_rule/1, format_rule/1]).
+-export([parse_rule/1, parse_and_compile_rules/1, format_rule/1]).
 
 -include("emqx_authz.hrl").
 
@@ -55,6 +55,27 @@
 %% API
 %%--------------------------------------------------------------------
 
+%% @doc Parse and compile raw ACL rules.
+%% If any bad rule is found, `{bad_acl_rule, ..}' is thrown.
+-spec parse_and_compile_rules([rule_raw()]) -> [emqx_authz_rule:rule()].
+parse_and_compile_rules(Rules) ->
+    lists:map(
+        fun(Rule) ->
+            case parse_rule(Rule) of
+                {ok, {Permission, Action, Topics}} ->
+                    try
+                        emqx_authz_rule:compile({Permission, all, Action, Topics})
+                    catch
+                        throw:Reason ->
+                            throw({bad_acl_rule, Reason})
+                    end;
+                {error, Reason} ->
+                    throw({bad_acl_rule, Reason})
+            end
+        end,
+        Rules
+    ).
+
 -spec parse_rule(rule_raw()) ->
     {ok, {
         emqx_authz_rule:permission_resolution_precompile(),

+ 1 - 0
apps/emqx_auth/src/emqx_authz/emqx_authz_schema.erl

@@ -88,6 +88,7 @@ fields("metrics_status_fields") ->
 fields("metrics") ->
     [
         {"total", ?HOCON(integer(), #{desc => ?DESC("metrics_total")})},
+        {"ignore", ?HOCON(integer(), #{desc => ?DESC("ignore")})},
         {"allow", ?HOCON(integer(), #{desc => ?DESC("allow")})},
         {"deny", ?HOCON(integer(), #{desc => ?DESC("deny")})},
         {"nomatch", ?HOCON(float(), #{desc => ?DESC("nomatch")})}

+ 3 - 0
apps/emqx_auth/test/data/bad_public_key_file.pem

@@ -0,0 +1,3 @@
+-----BEGIN PUBLIC KEY-----
+XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
+-----END PUBLIC KEY-----

+ 1 - 1
apps/emqx_auth_http/src/emqx_auth_http.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth_http, [
     {description, "EMQX External HTTP API Authentication and Authorization"},
-    {vsn, "0.2.3"},
+    {vsn, "0.3.0"},
     {registered, []},
     {mod, {emqx_auth_http_app, []}},
     {applications, [

+ 86 - 13
apps/emqx_auth_http/src/emqx_authn_http.erl

@@ -186,19 +186,7 @@ handle_response(Headers, Body) ->
     ContentType = proplists:get_value(<<"content-type">>, Headers),
     case safely_parse_body(ContentType, Body) of
         {ok, NBody} ->
-            case maps:get(<<"result">>, NBody, <<"ignore">>) of
-                <<"allow">> ->
-                    IsSuperuser = emqx_authn_utils:is_superuser(NBody),
-                    Attrs = emqx_authn_utils:client_attrs(NBody),
-                    Result = maps:merge(IsSuperuser, Attrs),
-                    {ok, Result};
-                <<"deny">> ->
-                    {error, not_authorized};
-                <<"ignore">> ->
-                    ignore;
-                _ ->
-                    ignore
-            end;
+            body_to_auth_data(NBody);
         {error, Reason} ->
             ?TRACE_AUTHN_PROVIDER(
                 error,
@@ -208,6 +196,91 @@ handle_response(Headers, Body) ->
             ignore
     end.
 
+body_to_auth_data(Body) ->
+    case maps:get(<<"result">>, Body, <<"ignore">>) of
+        <<"allow">> ->
+            IsSuperuser = emqx_authn_utils:is_superuser(Body),
+            Attrs = emqx_authn_utils:client_attrs(Body),
+            try
+                ExpireAt = expire_at(Body),
+                ACL = acl(ExpireAt, Body),
+                Result = merge_maps([ExpireAt, IsSuperuser, ACL, Attrs]),
+                {ok, Result}
+            catch
+                throw:{bad_acl_rule, Reason} ->
+                    %% it's a invalid token, so ok to log
+                    ?TRACE_AUTHN_PROVIDER("bad_acl_rule", Reason#{http_body => Body}),
+                    {error, bad_username_or_password};
+                throw:Reason ->
+                    ?TRACE_AUTHN_PROVIDER("bad_response_body", Reason#{http_body => Body}),
+                    {error, bad_username_or_password}
+            end;
+        <<"deny">> ->
+            {error, not_authorized};
+        <<"ignore">> ->
+            ignore;
+        _ ->
+            ignore
+    end.
+
+merge_maps([]) -> #{};
+merge_maps([Map | Maps]) -> maps:merge(Map, merge_maps(Maps)).
+
+%% Return either an empty map, or a map with `expire_at` at millisecond precision
+%% Millisecond precision timestamp is required by `auth_expire_at`
+%% emqx_channel:schedule_connection_expire/1
+expire_at(Body) ->
+    case expire_sec(Body) of
+        undefined ->
+            #{};
+        Sec ->
+            #{expire_at => erlang:convert_time_unit(Sec, second, millisecond)}
+    end.
+
+expire_sec(#{<<"expire_at">> := ExpireTime}) when is_integer(ExpireTime) ->
+    Now = erlang:system_time(second),
+    NowMs = erlang:convert_time_unit(Now, second, millisecond),
+    case ExpireTime < Now of
+        true ->
+            throw(#{
+                cause => "'expire_at' is in the past.",
+                system_time => Now,
+                expire_at => ExpireTime
+            });
+        false when ExpireTime > (NowMs div 2) ->
+            throw(#{
+                cause => "'expire_at' does not appear to be a Unix epoch time in seconds.",
+                system_time => Now,
+                expire_at => ExpireTime
+            });
+        false ->
+            ExpireTime
+    end;
+expire_sec(#{<<"expire_at">> := _}) ->
+    throw(#{cause => "'expire_at' is not an integer (Unix epoch time in seconds)."});
+expire_sec(_) ->
+    undefined.
+
+acl(#{expire_at := ExpireTimeMs}, #{<<"acl">> := Rules}) ->
+    #{
+        acl => #{
+            source_for_logging => http,
+            rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules),
+            %% It's seconds level precision (like JWT) for authz
+            %% see emqx_authz_client_info:check/1
+            expire => erlang:convert_time_unit(ExpireTimeMs, millisecond, second)
+        }
+    };
+acl(_NoExpire, #{<<"acl">> := Rules}) ->
+    #{
+        acl => #{
+            source_for_logging => http,
+            rules => emqx_authz_rule_raw:parse_and_compile_rules(Rules)
+        }
+    };
+acl(_, _) ->
+    #{}.
+
 safely_parse_body(ContentType, Body) ->
     try
         parse_body(ContentType, Body)

+ 163 - 0
apps/emqx_auth_http/test/emqx_authn_http_SUITE.erl

@@ -23,6 +23,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("emqx/include/emqx_placeholder.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
 
 -define(PATH, [?CONF_NS_ATOM]).
 
@@ -49,6 +50,21 @@
     })
 ).
 
+-define(SERVER_RESPONSE_WITH_ACL_JSON(ACL),
+    emqx_utils_json:encode(#{
+        result => allow,
+        acl => ACL
+    })
+).
+
+-define(SERVER_RESPONSE_WITH_ACL_JSON(ACL, Expire),
+    emqx_utils_json:encode(#{
+        result => allow,
+        acl => ACL,
+        expire_at => Expire
+    })
+).
+
 -define(SERVER_RESPONSE_URLENCODE(Result, IsSuperuser),
     list_to_binary(
         "result=" ++
@@ -510,6 +526,129 @@ test_ignore_allow_deny({ExpectedValue, ServerResponse}) ->
             )
     end.
 
+t_acl(_Config) ->
+    ACL = acl_rules(),
+    Config = raw_http_auth_config(),
+    {ok, _} = emqx:update_config(
+        ?PATH,
+        {create_authenticator, ?GLOBAL, Config}
+    ),
+    ok = emqx_authn_http_test_server:set_handler(
+        fun(Req0, State) ->
+            Req = cowboy_req:reply(
+                200,
+                #{<<"content-type">> => <<"application/json">>},
+                ?SERVER_RESPONSE_WITH_ACL_JSON(ACL),
+                Req0
+            ),
+            {ok, Req, State}
+        end
+    ),
+    {ok, C} = emqtt:start_link(
+        [
+            {clean_start, true},
+            {proto_ver, v5},
+            {clientid, <<"clientid">>},
+            {username, <<"username">>},
+            {password, <<"password">>}
+        ]
+    ),
+    {ok, _} = emqtt:connect(C),
+    Cases = [
+        {allow, <<"http-authn-acl/#">>},
+        {deny, <<"http-authn-acl/1">>},
+        {deny, <<"t/#">>}
+    ],
+    try
+        lists:foreach(
+            fun(Case) ->
+                test_acl(Case, C)
+            end,
+            Cases
+        )
+    after
+        ok = emqtt:disconnect(C)
+    end.
+
+t_auth_expire(_Config) ->
+    ACL = acl_rules(),
+    Config = raw_http_auth_config(),
+    {ok, _} = emqx:update_config(
+        ?PATH,
+        {create_authenticator, ?GLOBAL, Config}
+    ),
+    ExpireSec = 3,
+    WaitTime = timer:seconds(ExpireSec + 1),
+    Tests = [
+        {<<"ok-to-connect-but-expire-on-pub">>, erlang:system_time(second) + ExpireSec, fun(C) ->
+            {ok, _} = emqtt:connect(C),
+            receive
+                {'DOWN', _Ref, process, C, Reason} ->
+                    ?assertMatch({disconnected, ?RC_NOT_AUTHORIZED, _}, Reason)
+            after WaitTime ->
+                error(timeout)
+            end
+        end},
+        {<<"past">>, erlang:system_time(second) - 1, fun(C) ->
+            ?assertMatch({error, {bad_username_or_password, _}}, emqtt:connect(C)),
+            receive
+                {'DOWN', _Ref, process, C, Reason} ->
+                    ?assertMatch({shutdown, bad_username_or_password}, Reason)
+            end
+        end},
+        {<<"invalid">>, erlang:system_time(millisecond), fun(C) ->
+            ?assertMatch({error, {bad_username_or_password, _}}, emqtt:connect(C)),
+            receive
+                {'DOWN', _Ref, process, C, Reason} ->
+                    ?assertMatch({shutdown, bad_username_or_password}, Reason)
+            end
+        end}
+    ],
+    ok = emqx_authn_http_test_server:set_handler(
+        fun(Req0, State) ->
+            QS = cowboy_req:parse_qs(Req0),
+            {_, Username} = lists:keyfind(<<"username">>, 1, QS),
+            {_, ExpireTime, _} = lists:keyfind(Username, 1, Tests),
+            Req = cowboy_req:reply(
+                200,
+                #{<<"content-type">> => <<"application/json">>},
+                ?SERVER_RESPONSE_WITH_ACL_JSON(ACL, ExpireTime),
+                Req0
+            ),
+            {ok, Req, State}
+        end
+    ),
+    lists:foreach(fun test_auth_expire/1, Tests).
+
+test_auth_expire({Username, _ExpireTime, TestFn}) ->
+    {ok, C} = emqtt:start_link(
+        [
+            {clean_start, true},
+            {proto_ver, v5},
+            {clientid, <<"clientid">>},
+            {username, Username},
+            {password, <<"password">>}
+        ]
+    ),
+    _ = monitor(process, C),
+    unlink(C),
+    try
+        TestFn(C)
+    after
+        [ok = emqtt:disconnect(C) || is_process_alive(C)]
+    end.
+
+test_acl({allow, Topic}, C) ->
+    ?assertMatch(
+        {ok, #{}, [0]},
+        emqtt:subscribe(C, Topic)
+    );
+test_acl({deny, Topic}, C) ->
+    ?assertMatch(
+        {ok, #{}, [?RC_NOT_AUTHORIZED]},
+        emqtt:subscribe(C, Topic)
+    ).
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------
@@ -874,3 +1013,27 @@ to_list(B) when is_binary(B) ->
     binary_to_list(B);
 to_list(L) when is_list(L) ->
     L.
+
+acl_rules() ->
+    [
+        #{
+            <<"permission">> => <<"allow">>,
+            <<"action">> => <<"pub">>,
+            <<"topics">> => [
+                <<"http-authn-acl/1">>
+            ]
+        },
+        #{
+            <<"permission">> => <<"allow">>,
+            <<"action">> => <<"sub">>,
+            <<"topics">> =>
+                [
+                    <<"eq http-authn-acl/#">>
+                ]
+        },
+        #{
+            <<"permission">> => <<"deny">>,
+            <<"action">> => <<"all">>,
+            <<"topics">> => [<<"#">>]
+        }
+    ].

+ 44 - 3
apps/emqx_auth_http/test/emqx_authz_http_SUITE.erl

@@ -48,7 +48,7 @@ init_per_suite(Config) ->
             emqx_auth,
             emqx_auth_http
         ],
-        #{work_dir => ?config(priv_dir, Config)}
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
     ),
     [{suite_apps, Apps} | Config].
 
@@ -56,12 +56,22 @@ end_per_suite(_Config) ->
     ok = emqx_authz_test_lib:restore_authorizers(),
     emqx_cth_suite:stop(?config(suite_apps, _Config)).
 
-init_per_testcase(_Case, Config) ->
+init_per_testcase(t_bad_response = TestCase, Config) ->
+    TCApps = emqx_cth_suite:start_apps(
+        [emqx_management, emqx_mgmt_api_test_util:emqx_dashboard()],
+        #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
+    ),
+    init_per_testcase(common, [{tc_apps, TCApps} | Config]);
+init_per_testcase(_TestCase, Config) ->
     ok = emqx_authz_test_lib:reset_authorizers(),
     {ok, _} = emqx_authz_http_test_server:start_link(?HTTP_PORT, ?HTTP_PATH),
     Config.
 
-end_per_testcase(_Case, _Config) ->
+end_per_testcase(t_bad_response, Config) ->
+    TCApps = ?config(tc_apps, Config),
+    emqx_cth_suite:stop_apps(TCApps),
+    end_per_testcase(common, Config);
+end_per_testcase(_TestCase, _Config) ->
     _ = emqx_authz:set_feature_available(rich_actions, true),
     try
         ok = emqx_authz_http_test_server:stop()
@@ -589,6 +599,29 @@ t_bad_response(_Config) ->
         },
         get_metrics()
     ),
+    ?assertMatch(
+        {200, #{
+            <<"metrics">> := #{
+                <<"ignore">> := 1,
+                <<"nomatch">> := 0,
+                <<"allow">> := 0,
+                <<"deny">> := 0,
+                <<"total">> := 1
+            },
+            <<"node_metrics">> := [
+                #{
+                    <<"metrics">> := #{
+                        <<"ignore">> := 1,
+                        <<"nomatch">> := 0,
+                        <<"allow">> := 0,
+                        <<"deny">> := 0,
+                        <<"total">> := 1
+                    }
+                }
+            ]
+        }},
+        get_status_api()
+    ),
     ok.
 
 t_no_value_for_placeholder(_Config) ->
@@ -806,3 +839,11 @@ get_metrics() ->
             'authorization.nomatch'
         ]
     ).
+
+get_status_api() ->
+    Path = emqx_mgmt_api_test_util:uri(["authorization", "sources", "http", "status"]),
+    Auth = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    Res0 = emqx_mgmt_api_test_util:request_api(get, Path, _QParams = [], Auth, _Body = [], Opts),
+    {Status, RawBody} = emqx_mgmt_api_test_util:simplify_result(Res0),
+    {Status, emqx_utils_json:decode(RawBody, [return_maps])}.

+ 14 - 1
apps/emqx_auth_jwt/src/emqx_authn_jwks_client.erl

@@ -133,11 +133,13 @@ code_change(_OldVsn, State, _Extra) ->
 
 handle_options(#{
     endpoint := Endpoint,
+    headers := Headers,
     refresh_interval := RefreshInterval0,
     ssl_opts := SSLOpts
 }) ->
     #{
         endpoint => Endpoint,
+        headers => to_httpc_headers(Headers),
         refresh_interval => limit_refresh_interval(RefreshInterval0),
         ssl_opts => maps:to_list(SSLOpts),
         jwks => [],
@@ -147,6 +149,7 @@ handle_options(#{
 refresh_jwks(
     #{
         endpoint := Endpoint,
+        headers := Headers,
         ssl_opts := SSLOpts
     } = State
 ) ->
@@ -159,7 +162,7 @@ refresh_jwks(
         case
             httpc:request(
                 get,
-                {Endpoint, [{"Accept", "application/json"}]},
+                {Endpoint, Headers},
                 HTTPOpts,
                 [{body_format, binary}, {sync, false}, {receiver, self()}]
             )
@@ -185,6 +188,9 @@ limit_refresh_interval(Interval) when Interval < 10 ->
 limit_refresh_interval(Interval) ->
     Interval.
 
+to_httpc_headers(Headers) ->
+    [{binary_to_list(bin(K)), V} || {K, V} <- maps:to_list(Headers)].
+
 cancel_http_request(#{request_id := undefined} = State) ->
     State;
 cancel_http_request(#{request_id := RequestID} = State) ->
@@ -195,3 +201,10 @@ cancel_http_request(#{request_id := RequestID} = State) ->
         ok
     end,
     State#{request_id => undefined}.
+
+bin(List) when is_list(List) ->
+    unicode:characters_to_binary(List, utf8);
+bin(Atom) when is_atom(Atom) ->
+    erlang:atom_to_binary(Atom);
+bin(Bin) when is_binary(Bin) ->
+    Bin.

+ 49 - 35
apps/emqx_auth_jwt/src/emqx_authn_jwt.erl

@@ -19,6 +19,7 @@
 -include_lib("emqx_auth/include/emqx_authn.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx_placeholder.hrl").
+-include_lib("jose/include/jose_jwk.hrl").
 
 -export([
     create/2,
@@ -40,7 +41,7 @@ create(_AuthenticatorID, Config) ->
     create(Config).
 
 create(#{verify_claims := VerifyClaims} = Config) ->
-    create2(Config#{verify_claims => handle_verify_claims(VerifyClaims)}).
+    do_create(Config#{verify_claims => handle_verify_claims(VerifyClaims)}).
 
 update(
     #{use_jwks := false} = Config,
@@ -85,6 +86,7 @@ authenticate(
     }
 ) ->
     JWT = maps:get(From, Credential),
+    %% XXX: Only supports single public key
     JWKs = [JWK],
     VerifyClaims = render_expected(VerifyClaims0, Credential),
     verify(JWT, JWKs, VerifyClaims, AclClaimName, DisconnectAfterExpire);
@@ -121,7 +123,7 @@ destroy(_) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
-create2(#{
+do_create(#{
     use_jwks := false,
     algorithm := 'hmac-based',
     secret := Secret0,
@@ -144,24 +146,35 @@ create2(#{
                 from => From
             }}
     end;
-create2(#{
-    use_jwks := false,
-    algorithm := 'public-key',
-    public_key := PublicKey,
-    verify_claims := VerifyClaims,
-    disconnect_after_expire := DisconnectAfterExpire,
-    acl_claim_name := AclClaimName,
-    from := From
-}) ->
-    JWK = create_jwk_from_public_key(PublicKey),
-    {ok, #{
-        jwk => JWK,
-        verify_claims => VerifyClaims,
-        disconnect_after_expire => DisconnectAfterExpire,
-        acl_claim_name => AclClaimName,
-        from => From
-    }};
-create2(
+do_create(
+    #{
+        use_jwks := false,
+        algorithm := 'public-key',
+        public_key := PublicKey,
+        verify_claims := VerifyClaims,
+        disconnect_after_expire := DisconnectAfterExpire,
+        acl_claim_name := AclClaimName,
+        from := From
+    } = Config
+) ->
+    case
+        create_jwk_from_public_key(
+            maps:get(enable, Config, false),
+            PublicKey
+        )
+    of
+        {ok, JWK} ->
+            {ok, #{
+                jwk => JWK,
+                verify_claims => VerifyClaims,
+                disconnect_after_expire => DisconnectAfterExpire,
+                acl_claim_name => AclClaimName,
+                from => From
+            }};
+        {error, _Reason} = Err ->
+            Err
+    end;
+do_create(
     #{
         use_jwks := true,
         verify_claims := VerifyClaims,
@@ -185,9 +198,23 @@ create2(
         from => From
     }}.
 
-create_jwk_from_public_key(PublicKey) when
+create_jwk_from_public_key(true, PublicKey) when
     is_binary(PublicKey); is_list(PublicKey)
 ->
+    try do_create_jwk_from_public_key(PublicKey) of
+        %% XXX: Only supports single public key
+        #jose_jwk{} = Res ->
+            {ok, Res};
+        _ ->
+            {error, invalid_public_key}
+    catch
+        _:_ ->
+            {error, invalid_public_key}
+    end;
+create_jwk_from_public_key(false, _PublicKey) ->
+    {ok, []}.
+
+do_create_jwk_from_public_key(PublicKey) ->
     case filelib:is_file(PublicKey) of
         true ->
             jose_jwk:from_pem_file(PublicKey);
@@ -384,20 +411,7 @@ binary_to_number(Bin) ->
 parse_rules(Rules) when is_map(Rules) ->
     Rules;
 parse_rules(Rules) when is_list(Rules) ->
-    lists:map(fun parse_rule/1, Rules).
-
-parse_rule(Rule) ->
-    case emqx_authz_rule_raw:parse_rule(Rule) of
-        {ok, {Permission, Action, Topics}} ->
-            try
-                emqx_authz_rule:compile({Permission, all, Action, Topics})
-            catch
-                throw:Reason ->
-                    throw({bad_acl_rule, Reason})
-            end;
-        {error, Reason} ->
-            throw({bad_acl_rule, Reason})
-    end.
+    emqx_authz_rule_raw:parse_and_compile_rules(Rules).
 
 merge_maps([]) -> #{};
 merge_maps([Map | Maps]) -> maps:merge(Map, merge_maps(Maps)).

+ 32 - 0
apps/emqx_auth_jwt/src/emqx_authn_jwt_schema.erl

@@ -100,6 +100,15 @@ fields(jwt_jwks) ->
     [
         {use_jwks, sc(hoconsc:enum([true]), #{required => true, desc => ?DESC(use_jwks)})},
         {endpoint, fun endpoint/1},
+        {headers,
+            sc(
+                typerefl:alias("map", emqx_schema:binary_kv()),
+                #{
+                    default => #{<<"Accept">> => <<"application/json">>},
+                    validator => fun validate_headers/1,
+                    desc => ?DESC("jwks_headers")
+                }
+            )},
         {pool_size, fun emqx_connector_schema_lib:pool_size/1},
         {refresh_interval, fun refresh_interval/1},
         {ssl, #{
@@ -235,3 +244,26 @@ to_binary(B) when is_binary(B) ->
     B.
 
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).
+
+validate_headers(undefined) ->
+    ok;
+validate_headers(Headers) ->
+    BadKeys0 =
+        lists:filter(
+            fun(K) ->
+                re:run(K, <<"[^-0-9a-zA-Z_ ]">>, [{capture, none}]) =:= match
+            end,
+            maps:keys(Headers)
+        ),
+    case BadKeys0 of
+        [] ->
+            ok;
+        _ ->
+            BadKeys = lists:join(", ", BadKeys0),
+            Msg0 = io_lib:format(
+                "headers should contain only characters matching [-0-9a-zA-Z_ ]; bad headers: ~s",
+                [BadKeys]
+            ),
+            Msg = iolist_to_binary(Msg0),
+            {error, Msg}
+    end.

+ 161 - 1
apps/emqx_auth_jwt/test/emqx_authn_jwt_SUITE.erl

@@ -22,18 +22,21 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/asserts.hrl").
 
 -define(AUTHN_ID, <<"mechanism:jwt">>).
 
 -define(JWKS_PORT, 31333).
 -define(JWKS_PATH, "/jwks.json").
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
     Apps = emqx_cth_suite:start([emqx, emqx_conf, emqx_auth, emqx_auth_jwt], #{
-        work_dir => ?config(priv_dir, Config)
+        work_dir => emqx_cth_suite:work_dir(Config)
     }),
     [{apps, Apps} | Config].
 
@@ -41,6 +44,10 @@ end_per_suite(Config) ->
     ok = emqx_cth_suite:stop(?config(apps, Config)),
     ok.
 
+end_per_testcase(_TestCase, _Config) ->
+    emqx_common_test_helpers:call_janitor(),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Tests
 %%------------------------------------------------------------------------------
@@ -178,6 +185,7 @@ t_public_key(_) ->
         from => password,
         acl_claim_name => <<"acl">>,
         use_jwks => false,
+        enable => true,
         algorithm => 'public-key',
         public_key => PublicKey,
         verify_claims => [],
@@ -199,6 +207,51 @@ t_public_key(_) ->
     ?assertEqual(ok, emqx_authn_jwt:destroy(State)),
     ok.
 
+t_bad_public_keys(_) ->
+    BaseConfig = #{
+        mechanism => jwt,
+        from => password,
+        acl_claim_name => <<"acl">>,
+        use_jwks => false,
+        algorithm => 'public-key',
+        verify_claims => [],
+        disconnect_after_expire => false
+    },
+
+    %% try create with invalid public key
+    ?assertMatch(
+        {error, invalid_public_key},
+        emqx_authn_jwt:create(?AUTHN_ID, BaseConfig#{
+            enable => true,
+            public_key => <<"bad_public_key">>
+        })
+    ),
+
+    %% no such file
+    ?assertMatch(
+        {error, invalid_public_key},
+        emqx_authn_jwt:create(?AUTHN_ID, BaseConfig#{
+            enable => true,
+            public_key => data_file("bad_flie_path.pem")
+        })
+    ),
+
+    %% bad public key file content
+    ?assertMatch(
+        {error, invalid_public_key},
+        emqx_authn_jwt:create(?AUTHN_ID, BaseConfig#{
+            enable => true,
+            public_key => data_file("bad_public_key_file.pem")
+        })
+    ),
+
+    %% assume jwk authenticator is disabled
+    {ok, State} =
+        emqx_authn_jwt:create(?AUTHN_ID, BaseConfig#{public_key => <<"bad_public_key">>}),
+
+    ?assertEqual(ok, emqx_authn_jwt:destroy(State)),
+    ok.
+
 t_jwt_in_username(_) ->
     Secret = <<"abcdef">>,
     Config = #{
@@ -276,6 +329,7 @@ t_jwks_renewal(_Config) ->
         disconnect_after_expire => false,
         use_jwks => true,
         endpoint => "https://127.0.0.1:" ++ integer_to_list(?JWKS_PORT + 1) ++ ?JWKS_PATH,
+        headers => #{<<"Accept">> => <<"application/json">>},
         refresh_interval => 1000,
         pool_size => 1
     },
@@ -360,6 +414,102 @@ t_jwks_renewal(_Config) ->
     ?assertEqual(ok, emqx_authn_jwt:destroy(State2)),
     ok = emqx_authn_http_test_server:stop().
 
+t_jwks_custom_headers(_Config) ->
+    {ok, _} = emqx_authn_http_test_server:start_link(?JWKS_PORT, ?JWKS_PATH, server_ssl_opts()),
+    on_exit(fun() -> ok = emqx_authn_http_test_server:stop() end),
+    ok = emqx_authn_http_test_server:set_handler(jwks_handler_spy()),
+
+    PrivateKey = test_rsa_key(private),
+    Payload = #{
+        <<"username">> => <<"myuser">>,
+        <<"foo">> => <<"myuser">>,
+        <<"exp">> => erlang:system_time(second) + 10
+    },
+    Endpoint = iolist_to_binary("https://127.0.0.1:" ++ integer_to_list(?JWKS_PORT) ++ ?JWKS_PATH),
+    Config0 = #{
+        <<"mechanism">> => <<"jwt">>,
+        <<"use_jwks">> => true,
+        <<"from">> => <<"password">>,
+        <<"endpoint">> => Endpoint,
+        <<"headers">> => #{
+            <<"Accept">> => <<"application/json">>,
+            <<"Content-Type">> => <<>>,
+            <<"foo">> => <<"bar">>
+        },
+        <<"pool_size">> => 1,
+        <<"refresh_interval">> => 1_000,
+        <<"ssl">> => #{
+            <<"keyfile">> => cert_file("client.key"),
+            <<"certfile">> => cert_file("client.crt"),
+            <<"cacertfile">> => cert_file("ca.crt"),
+            <<"enable">> => true,
+            <<"verify">> => <<"verify_peer">>,
+            <<"server_name_indication">> => <<"authn-server">>
+        },
+        <<"verify_claims">> => #{<<"foo">> => <<"${username}">>}
+    },
+    {ok, Config} = hocon:binary(hocon_pp:do(Config0, #{})),
+    ChainName = 'mqtt:global',
+    AuthenticatorId = <<"jwt">>,
+    ?check_trace(
+        #{timetrap => 10_000},
+        begin
+            %% bad header keys
+            BadConfig1 = emqx_utils_maps:deep_put(
+                [<<"headers">>, <<"ça-va"/utf8>>], Config, <<"bien">>
+            ),
+            ?assertMatch(
+                {error, #{
+                    kind := validation_error,
+                    reason := <<"headers should contain only characters matching ", _/binary>>
+                }},
+                emqx_authn_api:update_config(
+                    [authentication],
+                    {create_authenticator, ChainName, BadConfig1}
+                )
+            ),
+            BadConfig2 = emqx_utils_maps:deep_put(
+                [<<"headers">>, <<"test_哈哈"/utf8>>],
+                Config,
+                <<"test_haha">>
+            ),
+            ?assertMatch(
+                {error, #{
+                    kind := validation_error,
+                    reason := <<"headers should contain only characters matching ", _/binary>>
+                }},
+                emqx_authn_api:update_config(
+                    [authentication],
+                    {create_authenticator, ChainName, BadConfig2}
+                )
+            ),
+            {{ok, _}, {ok, _}} =
+                ?wait_async_action(
+                    emqx_authn_api:update_config(
+                        [authentication],
+                        {create_authenticator, ChainName, Config}
+                    ),
+                    #{?snk_kind := jwks_endpoint_response},
+                    5_000
+                ),
+            ?assertReceive(
+                {http_request, #{
+                    headers := #{
+                        <<"accept">> := <<"application/json">>,
+                        <<"foo">> := <<"bar">>
+                    }
+                }}
+            ),
+            {ok, _} = emqx_authn_api:update_config(
+                [authentication],
+                {delete_authenticator, ChainName, AuthenticatorId}
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.
+
 t_verify_claims(_) ->
     Secret = <<"abcdef">>,
     Config0 = #{
@@ -613,6 +763,16 @@ jwks_handler(Req0, State) ->
     ),
     {ok, Req, State}.
 
+jwks_handler_spy() ->
+    TestPid = self(),
+    fun(Req, State) ->
+        ReqHeaders = cowboy_req:headers(Req),
+        ReqMap = #{headers => ReqHeaders},
+        ct:pal("jwks request:\n  ~p", [ReqMap]),
+        TestPid ! {http_request, ReqMap},
+        jwks_handler(Req, State)
+    end.
+
 test_rsa_key(public) ->
     data_file("public_key.pem");
 test_rsa_key(private) ->

+ 1 - 1
apps/emqx_bridge/test/emqx_bridge_SUITE.erl

@@ -216,7 +216,7 @@ t_create_with_bad_name(_Config) ->
     ok.
 
 t_create_with_bad_name_root(_Config) ->
-    BadBridgeName = <<"test_哈哈">>,
+    BadBridgeName = <<"test_哈哈"/utf8>>,
     BridgeConf = #{
         <<"bridge_mode">> => false,
         <<"clean_start">> => true,

+ 2 - 2
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -1431,7 +1431,7 @@ t_cluster_later_join_metrics(Config) ->
 t_create_with_bad_name(Config) ->
     Port = ?config(port, Config),
     URL1 = ?URL(Port, "path1"),
-    Name = <<"test_哈哈">>,
+    Name = <<"test_哈哈"/utf8>>,
     BadBridgeParams =
         emqx_utils_maps:deep_merge(
             ?HTTP_BRIDGE(URL1, Name),
@@ -1457,7 +1457,7 @@ t_create_with_bad_name(Config) ->
     ?assertMatch(
         #{
             <<"kind">> := <<"validation_error">>,
-            <<"reason">> := <<"Invalid name format.", _/binary>>
+            <<"reason">> := <<"invalid_map_key">>
         },
         Msg
     ),

+ 3 - 4
apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl

@@ -604,8 +604,7 @@ deprecated_config() ->
 t_name_too_long(_Config) ->
     LongName = list_to_binary(lists:duplicate(256, $a)),
     ?assertMatch(
-        {error,
-            {{_, 400, _}, _, #{<<"message">> := #{<<"reason">> := <<"Name is too long", _/binary>>}}}},
+        {error, {{_, 400, _}, _, #{<<"message">> := #{<<"reason">> := <<"invalid_map_key">>}}}},
         create_bridge_http_api_v1(#{name => LongName})
     ),
     ok.
@@ -942,7 +941,7 @@ t_scenario_2(Config) ->
     ok.
 
 t_create_with_bad_name(_Config) ->
-    BadBridgeName = <<"test_哈哈">>,
+    BadBridgeName = <<"test_哈哈"/utf8>>,
     %% Note: must contain SSL options to trigger bug.
     Cacertfile = emqx_common_test_helpers:app_path(
         emqx,
@@ -960,7 +959,7 @@ t_create_with_bad_name(_Config) ->
             <<"code">> := <<"BAD_REQUEST">>,
             <<"message">> := #{
                 <<"kind">> := <<"validation_error">>,
-                <<"reason">> := <<"Invalid name format.", _/binary>>
+                <<"reason">> := <<"invalid_map_key">>
             }
         }}} = create_bridge_http_api_v1(Opts),
     ok.

+ 3 - 2
apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl

@@ -728,8 +728,8 @@ t_prepared_statement_exists(Config) ->
     emqx_common_test_helpers:on_exit(fun() ->
         meck:unload()
     end),
-    MeckOpts = [passthrough, no_link, no_history, non_strict],
-    meck:new(emqx_postgresql, MeckOpts),
+    MeckOpts = [passthrough, no_link, no_history],
+    meck:new(epgsql, MeckOpts),
     InsertPrepStatementDupAndThenRemoveMeck =
         fun(Conn, Key, SQL, List) ->
             meck:passthrough([Conn, Key, SQL, List]),
@@ -795,6 +795,7 @@ t_prepared_statement_exists(Config) ->
             ok
         end
     ),
+    meck:unload(),
     ok.
 
 t_table_removed(Config) ->

+ 1 - 1
apps/emqx_connector/test/emqx_connector_SUITE.erl

@@ -275,7 +275,7 @@ t_create_with_bad_name_root_path({'end', _Config}) ->
     ok;
 t_create_with_bad_name_root_path(_Config) ->
     Path = [connectors],
-    BadConnectorName = <<"test_哈哈">>,
+    BadConnectorName = <<"test_哈哈"/utf8>>,
     ConnConfig0 = connector_config(),
     %% Note: must contain SSL options to trigger original bug.
     Cacertfile = emqx_common_test_helpers:app_path(

+ 1 - 1
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -697,7 +697,7 @@ t_connectors_probe(Config) ->
     ok.
 
 t_create_with_bad_name(Config) ->
-    ConnectorName = <<"test_哈哈">>,
+    ConnectorName = <<"test_哈哈"/utf8>>,
     Conf0 = ?KAFKA_CONNECTOR(ConnectorName),
     %% Note: must contain SSL options to trigger original bug.
     Cacertfile = emqx_common_test_helpers:app_path(

+ 6 - 2
apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl

@@ -181,12 +181,16 @@ create(#{name_var := NameVar} = Config) ->
     end.
 
 update(Config, State) ->
-    destroy(State),
+    destroy(State, false),
     create(Config).
 
 destroy(State) ->
+    destroy(State, true).
+
+destroy(State, TryDelete) ->
     emqx_dashboard_sso_oidc_session:stop(),
-    try_delete_jwks_file(State).
+    _ = TryDelete andalso try_delete_jwks_file(State),
+    ok.
 
 -dialyzer({nowarn_function, login/2}).
 login(

+ 6 - 1
apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_session.erl

@@ -35,6 +35,8 @@
 
 -define(DEFAULT_RANDOM_LEN, 32).
 -define(NOW, erlang:system_time(millisecond)).
+-define(BACKOFF_MIN, 5000).
+-define(BACKOFF_MAX, 10000).
 
 %%------------------------------------------------------------------------------
 %% API
@@ -49,7 +51,10 @@ start(Name, #{issuer := Issuer, session_expiry := SessionExpiry0}) ->
             [
                 #{
                     issuer => Issuer,
-                    name => {local, Name}
+                    name => {local, Name},
+                    backoff_min => ?BACKOFF_MIN,
+                    backoff_max => ?BACKOFF_MAX,
+                    backoff_type => random
                 }
             ]
         )

+ 46 - 6
apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl

@@ -57,7 +57,7 @@
 
 -record(state, {
     %% TCP/SSL/UDP/DTLS Wrapped Socket
-    socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
+    socket :: {esockd_transport, esockd:socket()} | {udp, _, _} | {esockd_udp_proxy, _, _},
     %% Peername of the connection
     peername :: emqx_types:peername(),
     %% Sockname of the connection
@@ -122,6 +122,9 @@ start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) ->
 start_link(esockd_transport, Sock, Options) ->
     Socket = {esockd_transport, Sock},
     Args = [self(), Socket, undefined, Options] ++ callback_modules(Options),
+    {ok, proc_lib:spawn_link(?MODULE, init, Args)};
+start_link(Socket = {esockd_udp_proxy, _ProxyId, _Sock}, Peername, Options) ->
+    Args = [self(), Socket, Peername, Options] ++ callback_modules(Options),
     {ok, proc_lib:spawn_link(?MODULE, init, Args)}.
 
 callback_modules(Options) ->
@@ -196,10 +199,14 @@ esockd_peername({udp, _SockPid, _Sock}, Peername) ->
     Peername;
 esockd_peername({esockd_transport, Sock}, _Peername) ->
     {ok, Peername} = esockd_transport:ensure_ok_or_exit(peername, [Sock]),
+    Peername;
+esockd_peername({esockd_udp_proxy, _ProxyId, _Sock}, Peername) ->
     Peername.
 
 esockd_wait(Socket = {udp, _SockPid, _Sock}) ->
     {ok, Socket};
+esockd_wait(Socket = {esockd_udp_proxy, _ProxyId, _Sock}) ->
+    {ok, Socket};
 esockd_wait({esockd_transport, Sock}) ->
     case esockd_transport:wait(Sock) of
         {ok, NSock} -> {ok, {esockd_transport, NSock}};
@@ -211,29 +218,41 @@ esockd_close({udp, _SockPid, _Sock}) ->
     %%gen_udp:close(Sock);
     ok;
 esockd_close({esockd_transport, Sock}) ->
-    esockd_transport:fast_close(Sock).
+    esockd_transport:fast_close(Sock);
+esockd_close({esockd_udp_proxy, ProxyId, _Sock}) ->
+    esockd_udp_proxy:close(ProxyId).
 
 esockd_ensure_ok_or_exit(peercert, {udp, _SockPid, _Sock}) ->
     nossl;
 esockd_ensure_ok_or_exit(Fun, {udp, _SockPid, Sock}) ->
     esockd_transport:ensure_ok_or_exit(Fun, [Sock]);
 esockd_ensure_ok_or_exit(Fun, {esockd_transport, Socket}) ->
-    esockd_transport:ensure_ok_or_exit(Fun, [Socket]).
+    esockd_transport:ensure_ok_or_exit(Fun, [Socket]);
+esockd_ensure_ok_or_exit(Fun, {esockd_udp_proxy, _ProxyId, Sock}) ->
+    esockd_transport:ensure_ok_or_exit(Fun, [Sock]).
 
 esockd_type({udp, _, _}) ->
     udp;
 esockd_type({esockd_transport, Socket}) ->
-    esockd_transport:type(Socket).
+    esockd_transport:type(Socket);
+esockd_type({esockd_udp_proxy, _ProxyId, Sock}) when is_port(Sock) ->
+    udp;
+esockd_type({esockd_udp_proxy, _ProxyId, _Sock}) ->
+    ssl.
 
 esockd_setopts({udp, _, _}, _) ->
     ok;
 esockd_setopts({esockd_transport, Socket}, Opts) ->
     %% FIXME: DTLS works??
+    esockd_transport:setopts(Socket, Opts);
+esockd_setopts({esockd_udp_proxy, _ProxyId, Socket}, Opts) ->
     esockd_transport:setopts(Socket, Opts).
 
 esockd_getstat({udp, _SockPid, Sock}, Stats) ->
     inet:getstat(Sock, Stats);
 esockd_getstat({esockd_transport, Sock}, Stats) ->
+    esockd_transport:getstat(Sock, Stats);
+esockd_getstat({esockd_udp_proxy, _ProxyId, Sock}, Stats) ->
     esockd_transport:getstat(Sock, Stats).
 
 esockd_send(Data, #state{
@@ -242,7 +261,9 @@ esockd_send(Data, #state{
 }) ->
     gen_udp:send(Sock, Ip, Port, Data);
 esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
-    esockd_transport:send(Sock, Data).
+    esockd_transport:send(Sock, Data);
+esockd_send(Data, #state{socket = {esockd_udp_proxy, ProxyId, _Sock}}) ->
+    esockd_udp_proxy:send(ProxyId, Data).
 
 keepalive_stats(recv) ->
     emqx_pd:get_counter(recv_pkt);
@@ -250,7 +271,8 @@ keepalive_stats(send) ->
     emqx_pd:get_counter(send_pkt).
 
 is_datadram_socket({esockd_transport, _}) -> false;
-is_datadram_socket({udp, _, _}) -> true.
+is_datadram_socket({udp, _, _}) -> true;
+is_datadram_socket({esockd_udp_proxy, _ProxyId, Sock}) -> erlang:is_port(Sock).
 
 %%--------------------------------------------------------------------
 %% callbacks
@@ -461,6 +483,21 @@ handle_msg({'$gen_cast', Req}, State) ->
     with_channel(handle_cast, [Req], State);
 handle_msg({datagram, _SockPid, Data}, State) ->
     parse_incoming(Data, State);
+handle_msg(
+    {{esockd_udp_proxy, _ProxyId, _Socket} = NSock, Data, Packets},
+    State = #state{
+        chann_mod = ChannMod,
+        channel = Channel
+    }
+) ->
+    ?SLOG(debug, #{msg => "RECV_data", data => Data}),
+    Oct = iolist_size(Data),
+    inc_counter(incoming_bytes, Oct),
+    Ctx = ChannMod:info(ctx, Channel),
+    ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.received', Oct),
+
+    NState = State#state{socket = NSock},
+    {ok, next_incoming_msgs(Packets), NState};
 handle_msg({Inet, _Sock, Data}, State) when
     Inet == tcp;
     Inet == ssl
@@ -506,6 +543,9 @@ handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
 handle_msg({close, Reason}, State) ->
     ?tp(debug, force_socket_close, #{reason => Reason}),
     handle_info({sock_closed, Reason}, close_socket(State));
+handle_msg(udp_proxy_closed, State) ->
+    ?tp(debug, udp_proxy_closed, #{reason => normal}),
+    handle_info({sock_closed, normal}, close_socket(State));
 handle_msg(
     {event, connected},
     State = #state{

+ 18 - 1
apps/emqx_gateway/src/emqx_gateway_schema.erl

@@ -139,6 +139,16 @@ fields(websocket) ->
 fields(udp_listener) ->
     [
         %% some special configs for udp listener
+        {health_check,
+            sc(
+                ref(udp_health_check),
+                #{
+                    desc => ?DESC(
+                        udp_health_check
+                    ),
+                    required => false
+                }
+            )}
     ] ++
         udp_opts() ++
         common_listener_opts();
@@ -175,7 +185,12 @@ fields(dtls_opts) ->
             versions => dtls_all_available
         },
         _IsRanchListener = false
-    ).
+    );
+fields(udp_health_check) ->
+    [
+        {request, sc(binary(), #{desc => ?DESC(udp_health_check_request), required => false})},
+        {reply, sc(binary(), #{desc => ?DESC(udp_health_check_reply), required => false})}
+    ].
 
 desc(gateway) ->
     "EMQX Gateway configuration root.";
@@ -201,6 +216,8 @@ desc(dtls_opts) ->
     "Settings for DTLS protocol.";
 desc(websocket) ->
     "Websocket options";
+desc(udp_health_check) ->
+    "UDP health check";
 desc(_) ->
     undefined.
 

+ 8 - 2
apps/emqx_gateway/src/emqx_gateway_utils.erl

@@ -151,7 +151,12 @@ find_sup_child(Sup, ChildId) ->
     {ok, [pid()]}
     | {error, term()}
 when
-    ModCfg :: #{frame_mod := atom(), chann_mod := atom(), connection_mod => atom()}.
+    ModCfg :: #{
+        frame_mod := atom(),
+        chann_mod := atom(),
+        connection_mod => atom(),
+        esockd_proxy_opts => map()
+    }.
 start_listeners(Listeners, GwName, Ctx, ModCfg) ->
     start_listeners(Listeners, GwName, Ctx, ModCfg, []).
 
@@ -519,7 +524,8 @@ esockd_opts(Type, Opts0) when ?IS_ESOCKD_LISTENER(Type) ->
             max_connections,
             max_conn_rate,
             proxy_protocol,
-            proxy_protocol_timeout
+            proxy_protocol_timeout,
+            health_check
         ],
         Opts0
     ),

+ 1 - 0
apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl

@@ -92,6 +92,7 @@ end_per_suite(Config) ->
 %%------------------------------------------------------------------------------
 
 t_case_coap(_) ->
+    emqx_coap_SUITE:restart_coap_with_connection_mode(false),
     Login = fun(URI, Checker) ->
         Action = fun(Channel) ->
             Req = emqx_coap_SUITE:make_req(post),

+ 4 - 2
apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl

@@ -98,7 +98,8 @@ t_case_coap_publish(_) ->
     end,
     Case = fun(Channel, Token) ->
         Fun(Channel, Token, <<"/publish">>, ?checkMatch({ok, changed, _})),
-        Fun(Channel, Token, <<"/badpublish">>, ?checkMatch({error, uauthorized}))
+        Fun(Channel, Token, <<"/badpublish">>, ?checkMatch({error, uauthorized})),
+        true
     end,
     Mod:with_connection(Case).
 
@@ -114,7 +115,8 @@ t_case_coap_subscribe(_) ->
     end,
     Case = fun(Channel, Token) ->
         Fun(Channel, Token, <<"/subscribe">>, ?checkMatch({ok, content, _})),
-        Fun(Channel, Token, <<"/badsubscribe">>, ?checkMatch({error, uauthorized}))
+        Fun(Channel, Token, <<"/badsubscribe">>, ?checkMatch({error, uauthorized})),
+        true
     end,
     Mod:with_connection(Case).
 

+ 10 - 31
apps/emqx_gateway_coap/src/emqx_coap_channel.erl

@@ -430,7 +430,6 @@ check_token(
         clientinfo = ClientInfo
     } = Channel
 ) ->
-    IsDeleteConn = is_delete_connection_request(Msg),
     #{clientid := ClientId} = ClientInfo,
     case emqx_coap_message:extract_uri_query(Msg) of
         #{
@@ -438,39 +437,18 @@ check_token(
             <<"token">> := Token
         } ->
             call_session(handle_request, Msg, Channel);
-        #{<<"clientid">> := ReqClientId, <<"token">> := ReqToken} ->
-            case emqx_gateway_cm:call(coap, ReqClientId, {check_token, ReqToken}) of
-                undefined when IsDeleteConn ->
+        Any ->
+            %% This channel is create by this DELETE command, so here can safely close this channel
+            case Token =:= undefined andalso is_delete_connection_request(Msg) of
+                true ->
                     Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
                     {shutdown, normal, Reply, Channel};
-                undefined ->
-                    ?SLOG(info, #{
-                        msg => "remote_connection_not_found",
-                        clientid => ReqClientId,
-                        token => ReqToken
-                    }),
-                    Reply = emqx_coap_message:reset(Msg),
-                    {shutdown, normal, Reply, Channel};
                 false ->
-                    ?SLOG(info, #{
-                        msg => "request_token_invalid", clientid => ReqClientId, token => ReqToken
-                    }),
-                    Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
-                    {shutdown, normal, Reply, Channel};
-                true ->
-                    %% hack: since each message request can spawn a new connection
-                    %% process, we can't rely on the `inc_incoming_stats' call in
-                    %% `emqx_gateway_conn:handle_incoming' to properly keep track of
-                    %% bumping incoming requests for an existing channel.  Since this
-                    %% number is used by keepalive, we have to bump it inside the
-                    %% requested channel/connection pid so heartbeats actually work.
-                    emqx_gateway_cm:cast(coap, ReqClientId, inc_recv_pkt),
-                    call_session(handle_request, Msg, Channel)
-            end;
-        _ ->
-            ErrMsg = <<"Missing token or clientid in connection mode">>,
-            Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg),
-            {shutdown, normal, Reply, Channel}
+                    io:format(">>> C1:~p, T1:~p~nC2:~p~n", [ClientId, Token, Any]),
+                    ErrMsg = <<"Missing token or clientid in connection mode">>,
+                    Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg),
+                    {ok, {outgoing, Reply}, Channel}
+            end
     end.
 
 run_conn_hooks(
@@ -785,6 +763,7 @@ process_connection(
 ) when
     ConnState == connected
 ->
+    %% TODO should take over the session here
     Queries = emqx_coap_message:extract_uri_query(Req),
     ErrMsg0 =
         case Queries of

+ 67 - 0
apps/emqx_gateway_coap/src/emqx_coap_proxy_conn.erl

@@ -0,0 +1,67 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_coap_proxy_conn).
+
+-behaviour(esockd_udp_proxy_connection).
+
+-include("emqx_coap.hrl").
+
+-export([initialize/1, find_or_create/4, get_connection_id/4, dispatch/3, close/2]).
+
+%%--------------------------------------------------------------------
+%% Callbacks
+%%--------------------------------------------------------------------
+initialize(_Opts) ->
+    emqx_coap_frame:initial_parse_state(#{}).
+
+find_or_create(CId, Transport, Peer, Opts) ->
+    case emqx_gateway_cm_registry:lookup_channels(coap, CId) of
+        [Pid] ->
+            {ok, Pid};
+        [] ->
+            emqx_gateway_conn:start_link(Transport, Peer, Opts)
+    end.
+
+get_connection_id(_Transport, _Peer, State, Data) ->
+    case parse_incoming(Data, [], State) of
+        {[Msg | _] = Packets, NState} ->
+            case emqx_coap_message:extract_uri_query(Msg) of
+                #{
+                    <<"clientid">> := ClientId
+                } ->
+                    {ok, ClientId, Packets, NState};
+                _ ->
+                    ErrMsg = <<"Missing token or clientid in connection mode">>,
+                    Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg),
+                    Bin = emqx_coap_frame:serialize_pkt(Reply, emqx_coap_frame:serialize_opts()),
+                    {error, Bin}
+            end;
+        _Error ->
+            invalid
+    end.
+
+dispatch(Pid, _State, Packet) ->
+    erlang:send(Pid, Packet).
+
+close(Pid, _State) ->
+    erlang:send(Pid, udp_proxy_closed).
+
+parse_incoming(<<>>, Packets, State) ->
+    {Packets, State};
+parse_incoming(Data, Packets, State) ->
+    {ok, Packet, Rest, NParseState} = emqx_coap_frame:parse(Data, State),
+    parse_incoming(Rest, [Packet | Packets], NParseState).

+ 14 - 3
apps/emqx_gateway_coap/src/emqx_gateway_coap.erl

@@ -20,7 +20,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx_gateway/include/emqx_gateway.hrl").
 
-%% define a gateway named stomp
+%% define a gateway named coap
 -gateway(#{
     name => coap,
     callback_module => ?MODULE,
@@ -58,10 +58,11 @@ on_gateway_load(
     Ctx
 ) ->
     Listeners = normalize_config(Config),
-    ModCfg = #{
+    ModCfg = maps:merge(connection_opts(Config), #{
         frame_mod => emqx_coap_frame,
         chann_mod => emqx_coap_channel
-    },
+    }),
+
     case
         start_listeners(
             Listeners, GwName, Ctx, ModCfg
@@ -105,3 +106,13 @@ on_gateway_unload(
 ) ->
     Listeners = normalize_config(Config),
     stop_listeners(GwName, Listeners).
+
+connection_opts(#{connection_required := false}) ->
+    #{};
+connection_opts(_) ->
+    #{
+        connection_mod => esockd_udp_proxy,
+        esockd_proxy_opts => #{
+            connection_mod => emqx_coap_proxy_conn
+        }
+    }.

+ 22 - 12
apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl

@@ -165,7 +165,8 @@ t_connection(_) ->
             emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
         )
     end,
-    do(Action).
+    do(Action),
+    ok.
 
 t_connection_with_short_param_name(_) ->
     Action = fun(Channel) ->
@@ -330,7 +331,8 @@ t_publish(_) ->
                 ?assertEqual(Payload, Msg#message.payload)
         after 500 ->
             ?assert(false)
-        end
+        end,
+        true
     end,
     with_connection(Topics, Action).
 
@@ -360,7 +362,9 @@ t_publish_with_retain_qos_expiry(_) ->
                 ?assertEqual(Payload, Msg#message.payload)
         after 500 ->
             ?assert(false)
-        end
+        end,
+
+        true
     end,
     with_connection(Topics, Action),
 
@@ -392,7 +396,8 @@ t_subscribe(_) ->
 
         #coap_content{payload = PayloadRecv} = Notify,
 
-        ?assertEqual(Payload, PayloadRecv)
+        ?assertEqual(Payload, PayloadRecv),
+        true
     end,
 
     with_connection(Topics, Fun),
@@ -431,7 +436,8 @@ t_subscribe_with_qos_opt(_) ->
 
         #coap_content{payload = PayloadRecv} = Notify,
 
-        ?assertEqual(Payload, PayloadRecv)
+        ?assertEqual(Payload, PayloadRecv),
+        true
     end,
 
     with_connection(Topics, Fun),
@@ -468,7 +474,8 @@ t_un_subscribe(_) ->
         {ok, nocontent, _} = do_request(Channel, URI, UnReq),
         ?LOGT("un observer topic:~ts~n", [Topic]),
         timer:sleep(100),
-        ?assertEqual([], emqx:subscribers(Topic))
+        ?assertEqual([], emqx:subscribers(Topic)),
+        true
     end,
 
     with_connection(Topics, Fun).
@@ -497,7 +504,8 @@ t_observe_wildcard(_) ->
 
         #coap_content{payload = PayloadRecv} = Notify,
 
-        ?assertEqual(Payload, PayloadRecv)
+        ?assertEqual(Payload, PayloadRecv),
+        true
     end,
 
     with_connection(Fun).
@@ -530,7 +538,8 @@ t_clients_api(_) ->
         {204, _} =
             request(delete, "/gateways/coap/clients/client1"),
         timer:sleep(200),
-        {200, #{data := []}} = request(get, "/gateways/coap/clients")
+        {200, #{data := []}} = request(get, "/gateways/coap/clients"),
+        false
     end,
     with_connection(Fun).
 
@@ -560,7 +569,8 @@ t_clients_subscription_api(_) ->
 
         {204, _} = request(delete, Path ++ "/tx"),
 
-        {200, []} = request(get, Path)
+        {200, []} = request(get, Path),
+        true
     end,
     with_connection(Fun).
 
@@ -578,7 +588,8 @@ t_clients_get_subscription_api(_) ->
 
         observe(Channel, Token, false),
 
-        {200, []} = request(get, Path)
+        {200, []} = request(get, Path),
+        true
     end,
     with_connection(Fun).
 
@@ -773,8 +784,7 @@ with_connection(Action) ->
     Fun = fun(Channel) ->
         Token = connection(Channel),
         timer:sleep(100),
-        Action(Channel, Token),
-        disconnection(Channel, Token),
+        _ = Action(Channel, Token) andalso disconnection(Channel, Token),
         timer:sleep(100)
     end,
     do(Fun).

+ 2 - 1
apps/emqx_gateway_coap/test/emqx_coap_api_SUITE.erl

@@ -207,7 +207,8 @@ test_recv_coap_request(UdpSock) ->
 test_send_coap_response(UdpSock, Host, Port, Code, Content, Request) ->
     is_list(Host) orelse error("Host is not a string"),
     {ok, IpAddr} = inet:getaddr(Host, inet),
-    Response = emqx_coap_message:piggyback(Code, Content, Request),
+    Response0 = emqx_coap_message:piggyback(Code, Content, Request),
+    Response = Response0#coap_message{options = #{uri_query => [<<"clientid=client1">>]}},
     ?LOGT("test_send_coap_response Response=~p", [Response]),
     Binary = emqx_coap_frame:serialize_pkt(Response, undefined),
     ok = gen_udp:send(UdpSock, IpAddr, Port, Binary).

+ 1 - 1
apps/emqx_management/src/emqx_mgmt_api_banned.erl

@@ -171,7 +171,7 @@ banned(post, #{body := Body}) ->
         {error, Reason} ->
             ErrorReason = io_lib:format("~p", [Reason]),
             {400, 'BAD_REQUEST', list_to_binary(ErrorReason)};
-        Ban ->
+        {ok, Ban} ->
             case emqx_banned:create(Ban) of
                 {ok, Banned} ->
                     {200, format(Banned)};

+ 29 - 0
apps/emqx_management/test/emqx_mgmt_api_listeners_SUITE.erl

@@ -418,6 +418,35 @@ t_update_listener_zone(_Config) ->
     ?assertMatch({error, {_, 400, _}}, request(put, Path, [], AddConf1)),
     ?assertMatch(#{<<"zone">> := <<"zone1">>}, request(put, Path, [], AddConf2)).
 
+t_update_listener_max_conn_rate({init, Config}) ->
+    Config;
+t_update_listener_max_conn_rate({'end', _Config}) ->
+    ok;
+t_update_listener_max_conn_rate(_Config) ->
+    ListenerId = <<"tcp:default">>,
+    Path = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]),
+    Conf = request(get, Path, [], []),
+    %% Check that default is infinity
+    ?assertMatch(#{<<"max_conn_rate">> := <<"infinity">>}, Conf),
+    %% Update to infinity
+    UpdateConfToInfinity = Conf#{<<"max_conn_rate">> => <<"infinity">>},
+    ?assertMatch(
+        #{<<"max_conn_rate">> := <<"infinity">>},
+        request(put, Path, [], UpdateConfToInfinity)
+    ),
+    %% Update to 42/s
+    UpdateConfTo42PerSec = Conf#{<<"max_conn_rate">> => <<"42/s">>},
+    ?assertMatch(
+        #{<<"max_conn_rate">> := <<"42/s">>},
+        request(put, Path, [], UpdateConfTo42PerSec)
+    ),
+    %% Update back to infinity
+    UpdateConfToInfinity = Conf#{<<"max_conn_rate">> => <<"infinity">>},
+    ?assertMatch(
+        #{<<"max_conn_rate">> := <<"infinity">>},
+        request(put, Path, [], UpdateConfToInfinity)
+    ).
+
 t_delete_nonexistent_listener(Config) when is_list(Config) ->
     NonExist = emqx_mgmt_api_test_util:api_path(["listeners", "tcp:nonexistent"]),
     ?assertMatch(

+ 8 - 0
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -154,6 +154,14 @@ do_request_api(Method, Request, Opts) ->
             {error, Reason}
     end.
 
+simplify_result(Res) ->
+    case Res of
+        {error, {{_, Status, _}, _, Body}} ->
+            {Status, Body};
+        {ok, {{_, Status, _}, _, Body}} ->
+            {Status, Body}
+    end.
+
 auth_header_() ->
     emqx_common_test_http:default_auth_header().
 

+ 24 - 28
apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl

@@ -452,18 +452,12 @@ ref(Struct) -> hoconsc:ref(?MODULE, Struct).
 mk(Type, Opts) -> hoconsc:mk(Type, Opts).
 array(Type) -> hoconsc:array(Type).
 
-%% FIXME: all examples
 example_input_create() ->
     #{
-        <<"sql_check">> =>
+        <<"message_transformation">> =>
             #{
-                summary => <<"Using a SQL check">>,
-                value => example_transformation([example_sql_check()])
-            },
-        <<"avro_check">> =>
-            #{
-                summary => <<"Using an Avro schema check">>,
-                value => example_transformation([example_avro_check()])
+                summary => <<"Simple message transformation">>,
+                value => example_transformation()
             }
     }.
 
@@ -472,7 +466,7 @@ example_input_update() ->
         <<"update">> =>
             #{
                 summary => <<"Update">>,
-                value => example_transformation([example_sql_check()])
+                value => example_transformation()
             }
     }.
 
@@ -493,20 +487,28 @@ example_input_dryrun_transformation() ->
             #{
                 summary => <<"Test an input against a configuration">>,
                 value => #{
-                    todo => true
+                    message => #{
+                        client_attrs => #{},
+                        payload => <<"{}">>,
+                        qos => 2,
+                        retain => true,
+                        topic => <<"t/u/v">>,
+                        user_property => #{}
+                    },
+                    transformation => example_transformation()
                 }
             }
     }.
 
 example_return_list() ->
-    OtherVal0 = example_transformation([example_avro_check()]),
+    OtherVal0 = example_transformation(),
     OtherVal = OtherVal0#{name => <<"other_transformation">>},
     #{
         <<"list">> =>
             #{
                 summary => <<"List">>,
                 value => [
-                    example_transformation([example_sql_check()]),
+                    example_transformation(),
                     OtherVal
                 ]
             }
@@ -547,29 +549,23 @@ example_return_metrics() ->
             }
     }.
 
-example_transformation(Checks) ->
+example_transformation() ->
     #{
         name => <<"my_transformation">>,
         enable => true,
         description => <<"my transformation">>,
         tags => [<<"transformation">>],
         topics => [<<"t/+">>],
-        strategy => <<"all_pass">>,
         failure_action => <<"drop">>,
         log_failure => #{<<"level">> => <<"info">>},
-        checks => Checks
-    }.
-
-example_sql_check() ->
-    #{
-        type => <<"sql">>,
-        sql => <<"select payload.temp as t where t > 10">>
-    }.
-
-example_avro_check() ->
-    #{
-        type => <<"avro">>,
-        schema => <<"my_avro_schema">>
+        payload_decoder => #{<<"type">> => <<"json">>},
+        payload_encoder => #{<<"type">> => <<"json">>},
+        operations => [
+            #{
+                key => <<"topic">>,
+                value => <<"concat([topic, '/', payload.t])">>
+            }
+        ]
     }.
 
 error_schema(Code, Message) ->

+ 2 - 0
apps/emqx_message_transformation/src/emqx_message_transformation_schema.erl

@@ -231,6 +231,8 @@ do_validate_unique_names([#{<<"name">> := Name} | _Rest], Acc) when is_map_key(N
 do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) ->
     do_validate_unique_names(Rest, Acc#{Name => true}).
 
+validate_unique_topics([]) ->
+    {error, <<"at least one topic filter must be defined">>};
 validate_unique_topics(Topics) ->
     Grouped = maps:groups_from_list(
         fun(T) -> T end,

+ 13 - 0
apps/emqx_message_transformation/test/emqx_message_transformation_tests.erl

@@ -87,6 +87,19 @@ schema_test_() ->
                     )
                 ])
             )},
+        {"topics must be non-empty",
+            ?_assertThrow(
+                {_Schema, [
+                    #{
+                        reason := <<"at least one topic filter must be defined", _/binary>>,
+                        value := [],
+                        kind := validation_error
+                    }
+                ]},
+                parse_and_check([
+                    transformation(<<"foo">>, [dummy_operation()], #{<<"topics">> => []})
+                ])
+            )},
         {"names are unique",
             ?_assertThrow(
                 {_Schema, [

+ 14 - 3
apps/emqx_plugins/test/emqx_plugins_SUITE.erl

@@ -906,7 +906,8 @@ group_t_cluster_leave(Config) ->
 %% hooks added by the plugin's `application:start/2' callback are indeed in place.
 %% See also: https://github.com/emqx/emqx/issues/13378
 t_start_node_with_plugin_enabled({init, Config}) ->
-    #{package := Package, shdir := InstallDir} = get_demo_plugin_package(),
+    #{package := Package} = get_demo_plugin_package(),
+    Basename = filename:basename(Package),
     NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX),
     AppSpecs = [
         emqx,
@@ -917,7 +918,7 @@ t_start_node_with_plugin_enabled({init, Config}) ->
                 #{
                     plugins =>
                         #{
-                            install_dir => InstallDir,
+                            install_dir => <<"plugins">>,
                             states =>
                                 [
                                     #{
@@ -938,6 +939,14 @@ t_start_node_with_plugin_enabled({init, Config}) ->
         ],
         #{work_dir => emqx_cth_suite:work_dir(?FUNCTION_NAME, Config)}
     ),
+    lists:foreach(
+        fun(#{work_dir := WorkDir}) ->
+            Destination = filename:join([WorkDir, "plugins", Basename]),
+            ok = filelib:ensure_dir(Destination),
+            {ok, _} = file:copy(Package, Destination)
+        end,
+        Specs
+    ),
     Names = [Name1, Name2],
     Nodes = [emqx_cth_cluster:node_name(N) || N <- Names],
     [
@@ -955,7 +964,9 @@ t_start_node_with_plugin_enabled(Config) when is_list(Config) ->
     ?check_trace(
         #{timetrap => 10_000},
         begin
-            [N1, N2 | _] = emqx_cth_cluster:start(NodeSpecs),
+            %% Hack: we use `restart' here to disable the clean slate verification, as we
+            %% just created and populated the `plugins' directory...
+            [N1, N2 | _] = lists:flatmap(fun emqx_cth_cluster:restart/1, NodeSpecs),
             ?ON(N1, assert_started_and_hooks_loaded()),
             ?ON(N2, assert_started_and_hooks_loaded()),
             %% Now make them join.

+ 0 - 1
apps/emqx_prometheus/.gitignore

@@ -13,7 +13,6 @@ rel/example_project
 emqx_prometheus.d
 ct.coverdata
 logs/
-data/
 test/ct.cover.spec
 cover/
 erlang.mk

+ 15 - 17
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -78,6 +78,10 @@
     do_stop/0
 ]).
 
+-ifdef(TEST).
+-export([cert_expiry_at_from_path/1]).
+-endif.
+
 %%--------------------------------------------------------------------
 %% Macros
 %%--------------------------------------------------------------------
@@ -950,10 +954,8 @@ cert_expiry_at_from_path(Path0) ->
             {ok, PemBin} ->
                 [CertEntry | _] = public_key:pem_decode(PemBin),
                 Cert = public_key:pem_entry_decode(CertEntry),
-                %% TODO: Not fully tested for all certs type
-                {'utcTime', NotAfterUtc} =
-                    Cert#'Certificate'.'tbsCertificate'#'TBSCertificate'.validity#'Validity'.'notAfter',
-                utc_time_to_epoch(NotAfterUtc);
+                %% XXX: Only pem cert supported by listeners
+                not_after_epoch(Cert);
             {error, Reason} ->
                 ?SLOG(error, #{
                     msg => "read_cert_file_failed",
@@ -976,21 +978,17 @@ cert_expiry_at_from_path(Path0) ->
             0
     end.
 
-utc_time_to_epoch(UtcTime) ->
-    date_to_expiry_epoch(utc_time_to_datetime(UtcTime)).
-
-utc_time_to_datetime(Str) ->
-    {ok, [Year, Month, Day, Hour, Minute, Second], _} = io_lib:fread(
-        "~2d~2d~2d~2d~2d~2dZ", Str
-    ),
-    %% Always Assuming YY is in 2000
-    {{2000 + Year, Month, Day}, {Hour, Minute, Second}}.
-
 %% 62167219200 =:= calendar:datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}).
 -define(EPOCH_START, 62167219200).
--spec date_to_expiry_epoch(calendar:datetime()) -> Seconds :: non_neg_integer().
-date_to_expiry_epoch(DateTime) ->
-    calendar:datetime_to_gregorian_seconds(DateTime) - ?EPOCH_START.
+not_after_epoch(#'Certificate'{
+    'tbsCertificate' = #'TBSCertificate'{
+        validity =
+            #'Validity'{'notAfter' = NotAfter}
+    }
+}) ->
+    pubkey_cert:'time_str_2_gregorian_sec'(NotAfter) - ?EPOCH_START;
+not_after_epoch(_) ->
+    0.
 
 %%========================================
 %% Mria

+ 21 - 0
apps/emqx_prometheus/test/data/cert.crt

@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDfzCCAmegAwIBAgIUJ3pE/Dwffa5gKNHY2L8HmazicmowDQYJKoZIhvcNAQEL
+BQAwZzELMAkGA1UEBhMCVVMxEzARBgNVBAgMCkNhbGlmb3JuaWExFjAUBgNVBAcM
+DVNhbiBGcmFuY2lzY28xFTATBgNVBAoMDEV4YW1wbGUgSW5jLjEUMBIGA1UEAwwL
+ZXhhbXBsZS5jb20wIBcNMjQwNzAzMTAyOTMzWhgPMjA1NDA2MjYxMDI5MzNaMGcx
+CzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQHDA1TYW4g
+RnJhbmNpc2NvMRUwEwYDVQQKDAxFeGFtcGxlIEluYy4xFDASBgNVBAMMC2V4YW1w
+bGUuY29tMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArFZKxzsxCaGP
+rVhilTd4PKk9jVrBLQ4xaFG6tmmlzjBCp+E35EulND4gpWZSUs9bYO/C+qykKmrL
+J7TddGBVXe6lbl6mMHqZzHUp9mJdvPBSHcqOHc2E/UiBwOpN4tatx6UdK+VEQySr
+z+dtc0Az5Itkoy/SvAu1Zzdq3d3MfxaTUvCmWfeR2huTalNQkG1jQ0C2CjCU9Z1f
+Ex+y1MzxNhVrrdExC8Vwrb4TDlue8/XwJ4A4gBJYNbVAwALcSKnF56nRib3evE3J
+Irvy2Rt4aC694JawWLPzJ1e2Rz8WBzCRPJAmaV4iD66sU8BMkmbCV+mMmF673s3R
+sS4kGqklvQIDAQABoyEwHzAdBgNVHQ4EFgQU0tDKnCDey6fKrzs7caDfS41Dii4w
+DQYJKoZIhvcNAQELBQADggEBAEIKvrSuUgpkIEUDV+UMr/5xUKkDyjNi4rwkBA6X
+Ej0HskXg6u9wOIkBKwpQbleDFICdyqXMhGMjN4050PQCizaInBJBz77ah47UwGGQ
+P+wavbcdHR9cbhewhCo6EtbCclPY1LXq4OFkgHMToLFzXC4S/kLX/KrhVApGHskO
+Ad4U4gmMtIalruz5Mzc4YuSaAjbRI9v0IxhvS8JU0uoOwhIstkrMlFc26SU6EcZ9
+k88gVmmqEnsvmJi4gn4XPgvJB8hPs0/OMDBCVjAM8VaxZZ6sqlTT9FTGaKbIJdDc
+KjT7VdbhVcuZo4s1u9gQzJNU2WHlHLwZi1wCjTC1vTE/HrQ=
+-----END CERTIFICATE-----

+ 14 - 0
apps/emqx_prometheus/test/emqx_prometheus_SUITE.erl

@@ -211,6 +211,16 @@ t_push_gateway(_) ->
 
     ok.
 
+t_cert_expiry_epoch(_) ->
+    Path = some_pem_path(),
+    ?assertEqual(
+        2666082573,
+        emqx_prometheus:cert_expiry_at_from_path(Path)
+    ).
+
+%%--------------------------------------------------------------------
+%% Helper functions
+
 start_mock_pushgateway(Port) ->
     ensure_loaded(cowboy),
     ensure_loaded(ranch),
@@ -249,3 +259,7 @@ init(Req0, Opts) ->
     RespHeader = #{<<"content-type">> => <<"text/plain; charset=utf-8">>},
     Req = cowboy_req:reply(200, RespHeader, <<"OK">>, Req0),
     {ok, Req, Opts}.
+
+some_pem_path() ->
+    Dir = code:lib_dir(emqx_prometheus, test),
+    _Path = filename:join([Dir, "data", "cert.crt"]).

+ 122 - 60
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -143,6 +143,15 @@
     perform_health_check => boolean()
 }.
 
+%% calls/casts/generic timeouts
+-record(add_channel, {channel_id :: channel_id(), config :: map()}).
+-record(start_channel_health_check, {channel_id :: channel_id()}).
+
+-type generic_timeout(Id, Content) :: {{timeout, Id}, timeout(), Content}.
+-type start_channel_health_check_action() :: generic_timeout(
+    #start_channel_health_check{}, #start_channel_health_check{}
+).
+
 %%------------------------------------------------------------------------------
 %% API
 %%------------------------------------------------------------------------------
@@ -405,7 +414,7 @@ add_channel(ResId, ChannelId, Config) ->
 ) ->
     ok | {error, term()}.
 add_channel(ResId, ChannelId, Config, Opts) ->
-    Result = safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION),
+    Result = safe_call(ResId, #add_channel{channel_id = ChannelId, config = Config}, ?T_OPERATION),
     maybe
         true ?= maps:get(perform_health_check, Opts, true),
         %% Wait for health_check to finish
@@ -570,7 +579,9 @@ handle_event({call, From}, health_check, _State, Data) ->
     handle_manual_resource_health_check(From, Data);
 handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) ->
     handle_manual_channel_health_check(From, Data, ChannelId);
-% State: CONNECTING
+%%--------------------------
+%% State: CONNECTING
+%%--------------------------
 handle_event(enter, _OldState, ?state_connecting = State, Data) ->
     ok = log_status_consistency(State, Data),
     {keep_state_and_data, [{state_timeout, 0, health_check}]};
@@ -582,25 +593,39 @@ handle_event(
     {call, From}, {remove_channel, ChannelId}, ?state_connecting = _State, Data
 ) ->
     handle_remove_channel(From, ChannelId, Data);
+%%--------------------------
 %% State: CONNECTED
 %% The connected state is entered after a successful on_start/2 of the callback mod
 %% and successful health_checks
+%%--------------------------
 handle_event(enter, _OldState, ?state_connected = State, Data) ->
     ok = log_status_consistency(State, Data),
     _ = emqx_alarm:safe_deactivate(Data#data.id),
     ?tp(resource_connected_enter, #{}),
-    {keep_state_and_data, health_check_actions(Data)};
+    {keep_state_and_data, resource_health_check_actions(Data)};
 handle_event(state_timeout, health_check, ?state_connected, Data) ->
     start_resource_health_check(Data);
 handle_event(
-    {call, From}, {add_channel, ChannelId, Config}, ?state_connected = _State, Data
+    {call, From},
+    #add_channel{channel_id = ChannelId, config = Config},
+    ?state_connected = _State,
+    Data
 ) ->
     handle_add_channel(From, Data, ChannelId, Config);
 handle_event(
     {call, From}, {remove_channel, ChannelId}, ?state_connected = _State, Data
 ) ->
     handle_remove_channel(From, ChannelId, Data);
+handle_event(
+    {timeout, #start_channel_health_check{channel_id = ChannelId}},
+    _,
+    ?state_connected = _State,
+    Data
+) ->
+    handle_start_channel_health_check(Data, ChannelId);
+%%--------------------------
 %% State: DISCONNECTED
+%%--------------------------
 handle_event(enter, _OldState, ?state_disconnected = State, Data) ->
     ok = log_status_consistency(State, Data),
     ?tp(resource_disconnected_enter, #{}),
@@ -608,14 +633,18 @@ handle_event(enter, _OldState, ?state_disconnected = State, Data) ->
 handle_event(state_timeout, auto_retry, ?state_disconnected, Data) ->
     ?tp(resource_auto_reconnect, #{}),
     start_resource(Data, undefined);
+%%--------------------------
 %% State: STOPPED
 %% The stopped state is entered after the resource has been explicitly stopped
+%%--------------------------
 handle_event(enter, _OldState, ?state_stopped = State, Data) ->
     ok = log_status_consistency(State, Data),
     {keep_state_and_data, []};
+%%--------------------------
 %% The following events can be handled in any other state
+%%--------------------------
 handle_event(
-    {call, From}, {add_channel, ChannelId, Config}, State, Data
+    {call, From}, #add_channel{channel_id = ChannelId, config = Config}, State, Data
 ) ->
     handle_not_connected_add_channel(From, ChannelId, Config, State, Data);
 handle_event(
@@ -645,6 +674,9 @@ handle_event(
     is_map_key(Pid, CHCWorkers)
 ->
     handle_channel_health_check_worker_down(Data0, Pid, Res);
+handle_event({timeout, #start_channel_health_check{channel_id = _}}, _, _State, _Data) ->
+    %% Stale health check action; currently, we only probe channel health when connected.
+    keep_state_and_data;
 % Ignore all other events
 handle_event(EventType, EventData, State, Data) ->
     ?SLOG(
@@ -702,7 +734,7 @@ retry_actions(Data) ->
             [{state_timeout, RetryInterval, auto_retry}]
     end.
 
-health_check_actions(Data) ->
+resource_health_check_actions(Data) ->
     [{state_timeout, health_check_interval(Data#data.opts), health_check}].
 
 handle_remove_event(From, ClearMetrics, Data) ->
@@ -1079,7 +1111,7 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
             {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
             Data2 = channels_health_check(?status_connected, Data1),
             Data = update_state(Data2, Data0),
-            Actions = Replies ++ health_check_actions(Data),
+            Actions = Replies ++ resource_health_check_actions(Data),
             {keep_state, Data, Actions};
         _ ->
             ?SLOG(warning, #{
@@ -1091,23 +1123,28 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
             %% subset of resource manager state...  But there should be a conversion
             %% between the two here, as resource manager also has `stopped', which is
             %% not a valid status at the time of writing.
-            {Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0),
-            {next_state, NewStatus, channels_health_check(NewStatus, Data), Replies}
+            {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
+            Data = channels_health_check(NewStatus, Data1),
+            Actions = Replies,
+            {next_state, NewStatus, Data, Actions}
     end.
 
 %% Continuation to be used when the current resource state is not `?state_connected'.
 continue_resource_health_check_not_connected(NewStatus, Data0) ->
-    {Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0),
+    {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
     case NewStatus of
         ?status_connected ->
-            {next_state, ?state_connected, channels_health_check(?status_connected, Data), Replies};
+            Data = channels_health_check(?status_connected, Data1),
+            Actions = Replies,
+            {next_state, ?state_connected, Data, Actions};
         ?status_connecting ->
-            Actions = Replies ++ health_check_actions(Data),
-            {next_state, ?status_connecting, channels_health_check(?status_connecting, Data),
-                Actions};
+            Data = channels_health_check(?status_connecting, Data1),
+            Actions = Replies ++ resource_health_check_actions(Data),
+            {next_state, ?status_connecting, Data, Actions};
         ?status_disconnected ->
-            {next_state, ?state_disconnected, channels_health_check(?status_disconnected, Data),
-                Replies}
+            Data = channels_health_check(?status_disconnected, Data1),
+            Actions = Replies,
+            {next_state, ?state_disconnected, Data, Actions}
     end.
 
 handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
@@ -1269,38 +1306,60 @@ resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) ->
         )
     ).
 
+-spec generic_timeout_action(Id, timeout(), Content) -> generic_timeout(Id, Content).
+generic_timeout_action(Id, Timeout, Content) ->
+    {{timeout, Id}, Timeout, Content}.
+
+-spec start_channel_health_check_action(channel_id(), map(), map(), data() | timeout()) ->
+    [start_channel_health_check_action()].
+start_channel_health_check_action(ChannelId, NewChanStatus, PreviousChanStatus, Data = #data{}) ->
+    Timeout = get_channel_health_check_interval(ChannelId, NewChanStatus, PreviousChanStatus, Data),
+    Event = #start_channel_health_check{channel_id = ChannelId},
+    [generic_timeout_action(Event, Timeout, Event)].
+
+get_channel_health_check_interval(ChannelId, NewChanStatus, PreviousChanStatus, Data) ->
+    emqx_utils:foldl_while(
+        fun
+            (#{config := #{resource_opts := #{health_check_interval := HCInterval}}}, _Acc) ->
+                {halt, HCInterval};
+            (_, Acc) ->
+                {cont, Acc}
+        end,
+        ?HEALTHCHECK_INTERVAL,
+        [
+            NewChanStatus,
+            PreviousChanStatus,
+            maps:get(ChannelId, Data#data.added_channels, #{})
+        ]
+    ).
+
 %% Currently, we only call resource channel health checks when the underlying resource is
 %% `?status_connected'.
 -spec trigger_health_check_for_added_channels(data()) -> data().
 trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) ->
     #{
-        channel := CHCWorkers0 =
+        channel :=
             #{
-                pending := CPending0,
+                %% TODO: rm pending
+                %% pending := CPending0,
                 ongoing := Ongoing0
             }
     } = HCWorkers0,
     NewOngoing = maps:filter(
         fun(ChannelId, OldStatus) ->
-            not is_map_key(ChannelId, Ongoing0) and
+            (not is_map_key(ChannelId, Ongoing0)) andalso
                 channel_status_is_channel_added(OldStatus)
         end,
         Data0#data.added_channels
     ),
     ChannelsToCheck = maps:keys(NewOngoing),
-    case ChannelsToCheck of
-        [] ->
-            %% Nothing to do.
-            Data0;
-        [ChannelId | Rest] ->
-            %% Shooting one check at a time.  We could increase concurrency in the future.
-            CHCWorkers = CHCWorkers0#{
-                pending := CPending0 ++ Rest,
-                ongoing := maps:merge(Ongoing0, NewOngoing)
-            },
-            Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
-            start_channel_health_check(Data1, ChannelId)
-    end.
+    lists:foldl(
+        fun(ChannelId, Acc) ->
+            start_channel_health_check(Acc, ChannelId)
+        end,
+        Data0,
+        ChannelsToCheck
+    ).
 
 -spec continue_channel_health_check_connected(
     channel_id(), channel_status_map(), channel_status_map(), data()
@@ -1338,12 +1397,29 @@ continue_channel_health_check_connected_no_update_during_check(ChannelId, OldSta
     end,
     Data.
 
+-spec handle_start_channel_health_check(data(), channel_id()) ->
+    gen_statem:event_handler_result(state(), data()).
+handle_start_channel_health_check(Data0, ChannelId) ->
+    Data = start_channel_health_check(Data0, ChannelId),
+    {keep_state, Data}.
+
 -spec start_channel_health_check(data(), channel_id()) -> data().
-start_channel_health_check(#data{} = Data0, ChannelId) ->
+start_channel_health_check(
+    #data{added_channels = AddedChannels, hc_workers = #{channel := #{ongoing := CHCOngoing0}}} =
+        Data0,
+    ChannelId
+) when
+    is_map_key(ChannelId, AddedChannels) andalso (not is_map_key(ChannelId, CHCOngoing0))
+->
     #data{hc_workers = HCWorkers0 = #{channel := CHCWorkers0}} = Data0,
     WorkerPid = spawn_channel_health_check_worker(Data0, ChannelId),
-    HCWorkers = HCWorkers0#{channel := CHCWorkers0#{WorkerPid => ChannelId}},
-    Data0#data{hc_workers = HCWorkers}.
+    ChannelStatus = maps:get(ChannelId, AddedChannels),
+    CHCOngoing = CHCOngoing0#{ChannelId => ChannelStatus},
+    CHCWorkers = CHCWorkers0#{WorkerPid => ChannelId, ongoing := CHCOngoing},
+    HCWorkers = HCWorkers0#{channel := CHCWorkers},
+    Data0#data{hc_workers = HCWorkers};
+start_channel_health_check(Data, _ChannelId) ->
+    Data.
 
 -spec spawn_channel_health_check_worker(data(), channel_id()) -> pid().
 spawn_channel_health_check_worker(#data{} = Data, ChannelId) ->
@@ -1380,33 +1456,19 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
     #{ongoing := Ongoing0} = CHCWorkers1,
     {PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0),
     CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1},
-    CHCWorkers3 = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers2),
     Data1 = Data0#data{added_channels = AddedChannels},
     {Replies, Data2} = reply_pending_channel_health_check_callers(ChannelId, NewStatus, Data1),
-    case CHCWorkers1 of
-        #{pending := [NextChannelId | Rest]} ->
-            CHCWorkers = CHCWorkers3#{pending := Rest},
-            HCWorkers = HCWorkers0#{channel := CHCWorkers},
-            Data3 = Data2#data{hc_workers = HCWorkers},
-            Data4 = continue_channel_health_check_connected(
-                ChannelId,
-                PreviousChanStatus,
-                CurrentStatus,
-                Data3
-            ),
-            Data = start_channel_health_check(Data4, NextChannelId),
-            {keep_state, update_state(Data, Data0), Replies};
-        #{pending := []} ->
-            HCWorkers = HCWorkers0#{channel := CHCWorkers3},
-            Data3 = Data2#data{hc_workers = HCWorkers},
-            Data = continue_channel_health_check_connected(
-                ChannelId,
-                PreviousChanStatus,
-                CurrentStatus,
-                Data3
-            ),
-            {keep_state, update_state(Data, Data0), Replies}
-    end.
+    HCWorkers = HCWorkers0#{channel := CHCWorkers2},
+    Data3 = Data2#data{hc_workers = HCWorkers},
+    Data = continue_channel_health_check_connected(
+        ChannelId,
+        PreviousChanStatus,
+        CurrentStatus,
+        Data3
+    ),
+    CHCActions = start_channel_health_check_action(ChannelId, NewStatus, PreviousChanStatus, Data),
+    Actions = Replies ++ CHCActions,
+    {keep_state, update_state(Data, Data0), Actions}.
 
 handle_channel_health_check_worker_down_new_channels_and_status(
     ChannelId,

+ 21 - 9
apps/emqx_schema_registry/src/emqx_schema_registry.erl

@@ -44,6 +44,12 @@
     get_serde/1
 ]).
 
+%%-------------------------------------------------------------------------------------------------
+%% Type definitions
+%%-------------------------------------------------------------------------------------------------
+
+-define(BAD_SCHEMA_NAME, <<"bad_schema_name">>).
+
 -type schema() :: #{
     type := serde_type(),
     source := binary(),
@@ -87,6 +93,8 @@ get_schema(SchemaName) ->
         Config ->
             {ok, Config}
     catch
+        throw:#{reason := ?BAD_SCHEMA_NAME} ->
+            {error, not_found};
         throw:not_found ->
             {error, not_found}
     end.
@@ -343,16 +351,20 @@ to_bin(A) when is_atom(A) -> atom_to_binary(A);
 to_bin(B) when is_binary(B) -> B.
 
 schema_name_bin_to_atom(Bin) when size(Bin) > 255 ->
-    throw(
-        iolist_to_binary(
-            io_lib:format(
-                "Name is is too long."
-                " Please provide a shorter name (<= 255 bytes)."
-                " The name that is too long: \"~s\"",
-                [Bin]
-            )
+    Msg = iolist_to_binary(
+        io_lib:format(
+            "Name is is too long."
+            " Please provide a shorter name (<= 255 bytes)."
+            " The name that is too long: \"~s\"",
+            [Bin]
         )
-    );
+    ),
+    Reason = #{
+        kind => validation_error,
+        reason => ?BAD_SCHEMA_NAME,
+        hint => Msg
+    },
+    throw(Reason);
 schema_name_bin_to_atom(Bin) ->
     try
         binary_to_existing_atom(Bin, utf8)

+ 9 - 1
apps/emqx_schema_registry/test/emqx_schema_registry_http_api_SUITE.erl

@@ -407,7 +407,8 @@ t_empty_sparkplug(_Config) ->
     ),
     ok.
 
-%% Tests that we can't create names that are too long and get a decent error message.
+%% Tests that we can't create or lookup names that are too long and get a decent error
+%% message.
 t_name_too_long(Config) ->
     SerdeType = ?config(serde_type, Config),
     SourceBin = ?config(schema_source, Config),
@@ -428,4 +429,11 @@ t_name_too_long(Config) ->
         }},
         request({post, Params})
     ),
+    ?assertMatch(
+        {ok, 404, #{
+            <<"code">> := <<"NOT_FOUND">>,
+            <<"message">> := <<"Schema not found">>
+        }},
+        request({get, SchemaName})
+    ),
     ok.

+ 1 - 1
apps/emqx_schema_validation/src/emqx_schema_validation.app.src

@@ -1,6 +1,6 @@
 {application, emqx_schema_validation, [
     {description, "EMQX Schema Validation"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, [emqx_schema_validation_sup, emqx_schema_validation_registry]},
     {mod, {emqx_schema_validation_app, []}},
     {applications, [

+ 2 - 0
apps/emqx_schema_validation/src/emqx_schema_validation_schema.erl

@@ -259,6 +259,8 @@ do_validate_unique_schema_checks(
 do_validate_unique_schema_checks([_Check | Rest], Seen, Duplicated) ->
     do_validate_unique_schema_checks(Rest, Seen, Duplicated).
 
+validate_unique_topics([]) ->
+    {error, <<"at least one topic filter must be defined">>};
 validate_unique_topics(Topics) ->
     Grouped = maps:groups_from_list(
         fun(T) -> T end,

+ 13 - 0
apps/emqx_schema_validation/test/emqx_schema_validation_tests.erl

@@ -117,6 +117,19 @@ schema_test_() ->
                     )
                 ])
             )},
+        {"topics must be non-empty",
+            ?_assertThrow(
+                {_Schema, [
+                    #{
+                        reason := <<"at least one topic filter must be defined", _/binary>>,
+                        value := [],
+                        kind := validation_error
+                    }
+                ]},
+                parse_and_check([
+                    validation(<<"foo">>, [sql_check()], #{<<"topics">> => []})
+                ])
+            )},
         {"foreach expression is not allowed",
             ?_assertThrow(
                 {_Schema, [

+ 3 - 1
apps/emqx_utils/src/emqx_utils.erl

@@ -290,8 +290,10 @@ do_check_oom([{Val, Max, Reason} | Rest]) ->
     end.
 
 tune_heap_size(#{enable := false}) ->
-    ok;
+    ignore;
 %% If the max_heap_size is set to zero, the limit is disabled.
+tune_heap_size(#{max_heap_size := 0}) ->
+    ignore;
 tune_heap_size(#{max_heap_size := MaxHeapSize}) when MaxHeapSize > 0 ->
     MaxSize =
         case erlang:system_info(wordsize) of

+ 54 - 5
apps/emqx_utils/src/emqx_utils_stream.erl

@@ -49,16 +49,19 @@
 
 %% Streams from .csv data
 -export([
-    csv/1
+    csv/1,
+    csv/2
 ]).
 
--export_type([stream/1]).
+-export_type([stream/1, csv_parse_opts/0]).
 
 %% @doc A stream is essentially a lazy list.
 -type stream_tail(T) :: fun(() -> next(T) | []).
 -type stream(T) :: list(T) | nonempty_improper_list(T, stream_tail(T)) | stream_tail(T).
 -type next(T) :: nonempty_improper_list(T, stream_tail(T)).
 
+-type csv_parse_opts() :: #{nullable => boolean(), filter_null => boolean()}.
+
 -dialyzer(no_improper_lists).
 
 -elvis([{elvis_style, nesting_level, disable}]).
@@ -325,13 +328,42 @@ ets(Cont, ContF) ->
 %% @doc Make a stream out of a .csv binary, where the .csv binary is loaded in all at once.
 %% The .csv binary is assumed to be in UTF-8 encoding and to have a header row.
 -spec csv(binary()) -> stream(map()).
-csv(Bin) when is_binary(Bin) ->
+csv(Bin) ->
+    csv(Bin, #{}).
+
+-spec csv(binary(), csv_parse_opts()) -> stream(map()).
+csv(Bin, Opts) when is_binary(Bin) ->
+    Liner =
+        case Opts of
+            #{nullable := true} ->
+                fun csv_read_nullable_line/1;
+            _ ->
+                fun csv_read_line/1
+        end,
+    Maper =
+        case Opts of
+            #{filter_null := true} ->
+                fun(Headers, Fields) ->
+                    maps:from_list(
+                        lists:filter(
+                            fun({_, Value}) ->
+                                Value =/= undefined
+                            end,
+                            lists:zip(Headers, Fields)
+                        )
+                    )
+                end;
+            _ ->
+                fun(Headers, Fields) ->
+                    maps:from_list(lists:zip(Headers, Fields))
+                end
+        end,
     Reader = fun _Iter(Headers, Lines) ->
-        case csv_read_line(Lines) of
+        case Liner(Lines) of
             {Fields, Rest} ->
                 case length(Fields) == length(Headers) of
                     true ->
-                        User = maps:from_list(lists:zip(Headers, Fields)),
+                        User = Maper(Headers, Fields),
                         [User | fun() -> _Iter(Headers, Rest) end];
                     false ->
                         error(bad_format)
@@ -355,6 +387,23 @@ csv_read_line([Line | Lines]) ->
 csv_read_line([]) ->
     eof.
 
+csv_read_nullable_line([Line | Lines]) ->
+    %% XXX: not support ' ' for the field value
+    Fields = lists:map(
+        fun(Bin) ->
+            case string:trim(Bin, both) of
+                <<>> ->
+                    undefined;
+                Any ->
+                    Any
+            end
+        end,
+        binary:split(Line, [<<",">>], [global])
+    ),
+    {Fields, Lines};
+csv_read_nullable_line([]) ->
+    eof.
+
 do_interleave(_Cont, _, [], []) ->
     [];
 do_interleave(Cont, N, [{N, S} | Rest], Rev) ->

+ 16 - 0
apps/emqx_utils/test/emqx_utils_SUITE.erl

@@ -154,6 +154,22 @@ t_check(_) ->
         emqx_utils:check_oom(Policy)
     ).
 
+t_tune_heap_size(_Config) ->
+    Policy = #{
+        max_mailbox_size => 10,
+        max_heap_size => 1024 * 1024 * 8,
+        enable => true
+    },
+    ?assertEqual(ignore, emqx_utils:tune_heap_size(Policy#{enable := false})),
+    %% Setting it to 0 disables the check.
+    ?assertEqual(ignore, emqx_utils:tune_heap_size(Policy#{max_heap_size := 0})),
+    {max_heap_size, PreviousHeapSize} = process_info(self(), max_heap_size),
+    try
+        ?assertMatch(PreviousHeapSize, emqx_utils:tune_heap_size(Policy))
+    after
+        process_flag(max_heap_size, PreviousHeapSize)
+    end.
+
 t_rand_seed(_) ->
     ?assert(is_tuple(emqx_utils:rand_seed())).
 

+ 1 - 0
changes/ce/feat-13436.en.md

@@ -0,0 +1 @@
+Added the option to add custom request headers to JWKS requests.

+ 1 - 0
changes/ce/fix-13375.en.md

@@ -0,0 +1 @@
+The value infinity has been added as default value to the listener configuration fields max_conn_rate, messages_rate and bytes_rate.

+ 1 - 0
changes/ce/fix-13398.en.md

@@ -0,0 +1 @@
+Fix acl rule clearing when reloading built-in-database for authorization using command line.

+ 1 - 0
changes/ce/fix-13403.en.md

@@ -0,0 +1 @@
+Fixed environment variable config override logging behaviour to avoid logging passwords.

+ 1 - 0
changes/ce/fix-13408.en.md

@@ -0,0 +1 @@
+Fix function_clause crash that occurs when attempting to authenticate with an invalid type of salt or password.

+ 1 - 0
changes/ce/fix-13419.en.md

@@ -0,0 +1 @@
+Fix garbled hints in crash log message when calling /configs API

+ 1 - 0
changes/ce/fix-13422.en.md

@@ -0,0 +1 @@
+Fixed an issue where the option `force_shutdown.max_heap_size` could not be set to 0 to disable this tuning.

+ 1 - 0
changes/ce/fix-13442.en.md

@@ -0,0 +1 @@
+Fixed an issue where the health check interval values of actions/sources were not being taken into account.

+ 1 - 0
changes/ce/perf-13441.en.md

@@ -0,0 +1 @@
+Enhanced CoAP gateway connection mode, UDP connection will always be bound to the corresponding gateway connection through the `clientid`.

+ 1 - 0
changes/ee/breaking-13420.en.md

@@ -0,0 +1 @@
+Added a schema validation that prevents configuring an empty set of topic filters for a Schema Validation.  Any such configurations will have to define at least one topic filter to be valid.  Such configurations, though, are probably very rare, as a Schema Validation with empty topics is essentially the same as having no validation at all.

+ 17 - 0
changes/ee/feat-13386.en.md

@@ -0,0 +1,17 @@
+Added a bootstrap file to batch loading banned data when initializing a single node or cluster, in other words, the import operation is performed only if there is no data in the database.
+
+
+This file is a CSV file with `,` as its delimiter.
+
+The first line of this file must be a header line. All valid headers are listed here:
+- as :: required
+- who :: required
+- by  :: optional
+- reason :: optional
+- at :: optional
+- until :: optional
+
+See the documentation for details on each field.
+
+Each row in the rest of this file must contain the same number of columns as the header line,
+and column can be omitted then its value will be `undefined`.

+ 1 - 0
changes/ee/fix-13420.en.md

@@ -0,0 +1 @@
+Added a schema validation to forbid empty topic filter lists when configuring a Schema Validation.

+ 1 - 0
changes/fix-13412.en.md

@@ -0,0 +1 @@
+Fixed an issue in the Prometheus API where the certificate expiration time format incorrectly returned `0` due to the use of `generalTime`.

+ 1 - 0
changes/fix-13432.en.md

@@ -0,0 +1 @@
+Fixed the issue where JWT authentication was silently bypassed when an invalid public key (or invalid public key file path) was used.

+ 3 - 3
deploy/docker/Dockerfile

@@ -2,17 +2,17 @@ ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.3-9:1.15.7-26.2.5-3-debian12
 ARG RUN_FROM=public.ecr.aws/debian/debian:stable-20240612-slim
 ARG SOURCE_TYPE=src # tgz
 
-FROM ${BUILD_FROM} as builder_src
+FROM ${BUILD_FROM} AS builder_src
 ONBUILD COPY . /emqx
 
-FROM ${RUN_FROM} as builder_tgz
+FROM ${RUN_FROM} AS builder_tgz
 ARG PROFILE=emqx
 ARG PKG_VSN
 ARG SUFFIX
 ARG TARGETARCH
 ONBUILD COPY ${PROFILE}-${PKG_VSN}${SUFFIX}-debian12-$TARGETARCH.tar.gz /${PROFILE}.tar.gz
 
-FROM builder_${SOURCE_TYPE} as builder
+FROM builder_${SOURCE_TYPE} AS builder
 
 ARG PROFILE=emqx
 ARG IS_ELIXIR=no

+ 2 - 2
mix.exs

@@ -182,9 +182,9 @@ defmodule EMQXUmbrella.MixProject do
   end
 
   def common_dep(:ekka), do: {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true}
-  def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true}
+  def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.3", override: true}
   def common_dep(:gproc), do: {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}
-  def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.42.2", override: true}
+  def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.1", override: true}
   def common_dep(:lc), do: {:lc, github: "emqx/lc", tag: "0.3.2", override: true}
   # in conflict by ehttpc and emqtt
   def common_dep(:gun), do: {:gun, github: "emqx/gun", tag: "1.3.11", override: true}

+ 2 - 2
rebar.config

@@ -82,7 +82,7 @@
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
-    {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
+    {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}},
     {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-6"}}},
     {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
@@ -98,7 +98,7 @@
     {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.5"}}},
     {getopt, "1.0.2"},
     {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.10"}}},
-    {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.42.2"}}},
+    {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.1"}}},
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
     {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.1"}}},
     {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}},

+ 5 - 0
rel/i18n/emqx_authn_jwt_schema.hocon

@@ -152,4 +152,9 @@ disconnect_after_expire.desc:
 disconnect_after_expire.label:
 """Disconnect After Expire"""
 
+jwks_headers.label:
+"""HTTP Headers"""
+jwks_headers.desc:
+"""List of HTTP headers to send with the JWKS request."""
+
 }

+ 5 - 0
rel/i18n/emqx_authz_schema.hocon

@@ -78,6 +78,11 @@ failed.desc:
 failed.label:
 """Failed"""
 
+ignore.desc:
+"""Count of query ignored.  This counter is increased whenever the authorization source attempts to authorize a request, but either it's not applicable, or an error was encountered and the result is undecidable"""
+ignore.label:
+"""Ignored"""
+
 metrics.desc:
 """The metrics of the resource."""
 

+ 9 - 0
rel/i18n/emqx_gateway_schema.hocon

@@ -195,4 +195,13 @@ Relevant when the EMQX cluster is deployed behind a load-balancer."""
 fields_ws_opts_proxy_address_header.label:
 """Proxy address header"""
 
+udp_health_check.desc:
+"""Some Cloud platform use a `request-reply` mechanism to check whether a UDP port is healthy, here can configure this pair."""
+
+udp_health_check_request.desc:
+"""The content of the request."""
+
+udp_health_check_reply.desc:
+"""The content to reply."""
+
 }

+ 19 - 1
rel/i18n/emqx_schema.hocon

@@ -801,7 +801,7 @@ mqtt_max_topic_levels.label:
 """Max Topic Levels"""
 
 force_shutdown_max_heap_size.desc:
-"""Total heap size"""
+"""Total heap size.  Setting this to 0 disables this limitation."""
 
 force_shutdown_max_heap_size.label:
 """Total heap size"""
@@ -1630,4 +1630,22 @@ client_attrs_init_set_as_attr {
     The extracted attribute will be stored in the `client_attrs` property with this name."""
 }
 
+banned_bootstrap_file.desc:
+"""The bootstrap file is a CSV file used to batch loading banned data when initializing a single node or cluster, in other words, the import operation is performed only if there is no data in the database.
+
+The delimiter for this file is `,`.
+
+The first line of this file must be a header line. All valid headers are listed here:
+- as :: required
+- who :: required
+- by  :: optional
+- reason :: optional
+- at :: optional
+- until :: optional
+
+See the documentation for details on each field.
+
+Each row in the rest of this file must contain the same number of columns as the header line,
+and column can be omitted then its value will be `undefined`."""
+
 }

+ 34 - 18
scripts/ui-tests/dashboard_test.py

@@ -3,6 +3,7 @@ import time
 import unittest
 import pytest
 import requests
+import logging
 from urllib.parse import urljoin
 from selenium import webdriver
 from selenium.webdriver.common.by import By
@@ -12,6 +13,9 @@ from selenium.webdriver.support.wait import WebDriverWait
 from selenium.webdriver.common import utils
 from selenium.common.exceptions import NoSuchElementException
 
+logger = logging.getLogger()
+logger.setLevel(logging.INFO)
+
 @pytest.fixture
 def driver():
     options = Options()
@@ -31,39 +35,52 @@ def dashboard_url(dashboard_host, dashboard_port):
         time.sleep(1)
     return f"http://{dashboard_host}:{dashboard_port}"
 
-@pytest.fixture
 def login(driver, dashboard_url):
     # admin is set in CI jobs, hence as default value
     password = os.getenv("EMQX_DASHBOARD__DEFAULT_PASSWORD", "admin")
     driver.get(dashboard_url)
     assert "EMQX Dashboard" == driver.title
     assert f"{dashboard_url}/#/login?to=/dashboard/overview" == driver.current_url
-    driver.find_element(By.XPATH, "//div[@class='login']//form[1]//input[@type='text']").send_keys("admin")
-    driver.find_element(By.XPATH, "//div[@class='login']//form[1]//input[@type='password']").send_keys(password)
-    driver.find_element(By.XPATH, "//div[@class='login']//form[1]//button[1]").click()
+    driver.execute_script("window.localStorage.setItem('licenseTipVisible','false');")
+    driver.find_element(By.XPATH, "//div[@class='login']//form//input[@type='text']").send_keys("admin")
+    driver.find_element(By.XPATH, "//div[@class='login']//form//input[@type='password']").send_keys(password)
+    driver.find_element(By.XPATH, "//div[@class='login']//form//button").click()
     dest_url = urljoin(dashboard_url, "/#/dashboard/overview")
-    driver.get(dest_url)
     ensure_current_url(driver, dest_url)
+    assert len(driver.find_elements(By.XPATH, "//div[@class='login']")) == 0
+    logger.info(f"Logged in to {dashboard_url}")
 
-def ensure_current_url(driver, url):
+def ensure_current_url(d, url):
     count = 0
-    while url != driver.current_url:
+    while url != d.current_url:
         if count == 10:
             raise Exception(f"Failed to load {url}")
         count += 1
         time.sleep(1)
 
-def title(driver):
-    return driver.find_element("xpath", "//div[@id='app']//h1[@class='header-title']")
+def title(d):
+    title = ''
+    for _ in range(5):
+        try:
+            title = d.find_element("xpath", "//div[@id='app']//h1[@class='header-title']")
+            break
+        except NoSuchElementException:
+            time.sleep(1)
+    else:
+        raise AssertionError("Cannot find the title element")
+    return title
 
-def wait_title_text(driver, text):
-    return WebDriverWait(driver, 10).until(lambda x: title(x).text == text)
+def wait_title_text(d, text):
+    return WebDriverWait(d, 10).until(lambda x: title(x).text == text)
 
-def test_basic(driver, login, dashboard_url):
-    driver.get(dashboard_url)
+def test_basic(driver, dashboard_url):
+    login(driver, dashboard_url)
+    logger.info(f"Current URL: {driver.current_url}")
     wait_title_text(driver, "Cluster Overview")
 
-def test_log(driver, login, dashboard_url):
+def test_log(driver, dashboard_url):
+    login(driver, dashboard_url)
+    logger.info(f"Current URL: {driver.current_url}")
     dest_url = urljoin(dashboard_url, "/#/log")
     driver.get(dest_url)
     ensure_current_url(driver, dest_url)
@@ -95,10 +112,9 @@ def fetch_version(url):
     version_str = info['rel_vsn']
     return parse_version(version_str)
 
-def test_docs_link(driver, login, dashboard_url):
-    dest_url = urljoin(dashboard_url, "/#/dashboard/overview")
-    driver.get(dest_url)
-    ensure_current_url(driver, dest_url)
+def test_docs_link(driver, dashboard_url):
+    login(driver, dashboard_url)
+    logger.info(f"Current URL: {driver.current_url}")
     xpath_link_help = "//div[@id='app']//div[@class='nav-header']//a[contains(@class, 'link-help')]"
     # retry up to 5 times
     for _ in range(5):

+ 2 - 2
scripts/ui-tests/docker-compose.yaml

@@ -1,14 +1,14 @@
-version: '3.9'
-
 services:
   emqx:
     image: ${_EMQX_DOCKER_IMAGE_TAG:-emqx/emqx:latest}
     environment:
       EMQX_DASHBOARD__DEFAULT_PASSWORD: admin
+      EMQX_LOG__CONSOLE__LEVEL: debug
 
   selenium:
     shm_size: '2gb'
     image: ghcr.io/emqx/selenium-chrome:1.0.0
+    platform: linux/amd64
     volumes:
       - ./:/app
     depends_on: