Просмотр исходного кода

Merge pull request #12637 from lafirest/merge-55

sync release-55 to master
lafirest 1 год назад
Родитель
Сommit
c2dcb507cf

+ 8 - 3
apps/emqx/src/emqx_topic.erl

@@ -269,10 +269,11 @@ do_join(_TopicAcc, [C | Words]) when ?MULTI_LEVEL_WILDCARD_NOT_LAST(C, Words) ->
 do_join(TopicAcc, [Word | Words]) ->
     do_join(<<TopicAcc/binary, "/", (bin(Word))/binary>>, Words).
 
--spec parse(topic() | {topic(), map()}) -> {topic() | share(), map()}.
-parse(TopicFilter) when is_binary(TopicFilter) ->
+-spec parse(TF | {TF, map()}) -> {TF, map()} when
+    TF :: topic() | share().
+parse(TopicFilter) when ?IS_TOPIC(TopicFilter) ->
     parse(TopicFilter, #{});
-parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
+parse({TopicFilter, Options}) when ?IS_TOPIC(TopicFilter) ->
     parse(TopicFilter, Options).
 
 -spec parse(topic() | share(), map()) -> {topic() | share(), map()}.
@@ -282,6 +283,10 @@ parse(#share{topic = Topic = <<?QUEUE, "/", _/binary>>}, _Options) ->
     error({invalid_topic_filter, Topic});
 parse(#share{topic = Topic = <<?SHARE, "/", _/binary>>}, _Options) ->
     error({invalid_topic_filter, Topic});
+parse(#share{} = T, #{nl := 1} = _Options) ->
+    %% Protocol Error and Should Disconnect
+    %% MQTT-5.0 [MQTT-3.8.3-4] and [MQTT-4.13.1-1]
+    error({invalid_subopts_nl, maybe_format_share(T)});
 parse(<<?QUEUE, "/", Topic/binary>>, Options) ->
     parse(#share{group = <<?QUEUE>>, topic = Topic}, Options);
 parse(TopicFilter = <<?SHARE, "/", Rest/binary>>, Options) ->

+ 1 - 23
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -859,25 +859,10 @@ maybe_retry({error, Reason}, Context, ReplyFunAndArgs) ->
 maybe_retry(Result, _Context, ReplyFunAndArgs) ->
     emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
 
-%% The HOCON schema system may generate sensitive keys with this format
-is_sensitive_key(Atom) when is_atom(Atom) ->
-    is_sensitive_key(erlang:atom_to_binary(Atom));
-is_sensitive_key(Bin) when is_binary(Bin), (size(Bin) =:= 19 orelse size(Bin) =:= 13) ->
-    %% We want to convert this to lowercase since the http header fields
-    %% are case insensitive, which means that a user of the Webhook bridge
-    %% can write this field name in many different ways.
-    case try_bin_to_lower(Bin) of
-        <<"authorization">> -> true;
-        <<"proxy-authorization">> -> true;
-        _ -> false
-    end;
-is_sensitive_key(_) ->
-    false.
-
 %% Function that will do a deep traversal of Data and remove sensitive
 %% information (i.e., passwords)
 redact(Data) ->
-    emqx_utils:redact(Data, fun is_sensitive_key/1).
+    emqx_utils:redact(Data).
 
 %% because the body may contain some sensitive data
 %% and at the same time the redact function will not scan the binary data
@@ -901,13 +886,6 @@ redact_test_() ->
         ]
     },
     [
-        ?_assert(is_sensitive_key(<<"Authorization">>)),
-        ?_assert(is_sensitive_key(<<"AuthoriZation">>)),
-        ?_assert(is_sensitive_key('AuthoriZation')),
-        ?_assert(is_sensitive_key(<<"PrOxy-authoRizaTion">>)),
-        ?_assert(is_sensitive_key('PrOxy-authoRizaTion')),
-        ?_assertNot(is_sensitive_key(<<"Something">>)),
-        ?_assertNot(is_sensitive_key(89)),
         ?_assertNotEqual(TestData, redact(TestData))
     ].
 

+ 3 - 1
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -387,6 +387,8 @@ on_get_channel_status(InstanceId, _ChannelId, State) ->
 get_payload(#{payload := Payload}) ->
     Payload;
 get_payload(#{<<"payload">> := Payload}) ->
+    Payload;
+get_payload(Payload) ->
     Payload.
 
 parse_payload(ParsedPayload) when is_map(ParsedPayload) ->
@@ -714,7 +716,7 @@ render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message)
         DeviceId ->
             case get_data_template(Channel, Payloads) of
                 [] ->
-                    {error, invalid_data};
+                    {error, invalid_template};
                 DataTemplate ->
                     case proc_data(DataTemplate, Message) of
                         {ok, DataList} ->

+ 4 - 4
apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl

@@ -503,11 +503,11 @@ t_extract_device_id_from_rule_engine_message(Config) ->
     ),
     ok.
 
-t_sync_invalid_data(Config) ->
+t_sync_invalid_template(Config) ->
     emqx_bridge_v2_testlib:t_sync_query(
         Config,
         make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
-        is_error_check(invalid_data),
+        is_error_check(invalid_template),
         iotdb_bridge_on_query
     ).
 
@@ -519,11 +519,11 @@ t_async_device_id_missing(Config) ->
         iotdb_bridge_on_query_async
     ).
 
-t_async_invalid_data(Config) ->
+t_async_invalid_template(Config) ->
     emqx_bridge_v2_testlib:t_async_query(
         Config,
         make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
-        is_error_check(invalid_data),
+        is_error_check(invalid_template),
         iotdb_bridge_on_query_async
     ).
 

+ 6 - 3
apps/emqx_connector/src/emqx_connector_resource.erl

@@ -137,7 +137,7 @@ create(Type, Name, Conf0, Opts) ->
         msg => "create connector",
         type => Type,
         name => Name,
-        config => emqx_utils:redact(Conf0)
+        config => redact(Conf0, Type)
     }),
     TypeBin = bin(Type),
     ResourceId = resource_id(Type, Name),
@@ -175,7 +175,7 @@ update(Type, Name, {OldConf, Conf0}, Opts) ->
                 msg => "update connector",
                 type => Type,
                 name => Name,
-                config => emqx_utils:redact(Conf)
+                config => redact(Conf, Type)
             }),
             case recreate(Type, Name, Conf, Opts) of
                 {ok, _} ->
@@ -185,7 +185,7 @@ update(Type, Name, {OldConf, Conf0}, Opts) ->
                         msg => "updating_a_non_existing_connector",
                         type => Type,
                         name => Name,
-                        config => emqx_utils:redact(Conf)
+                        config => redact(Conf, Type)
                     }),
                     create(Type, Name, Conf, Opts);
                 {error, Reason} ->
@@ -379,3 +379,6 @@ override_start_after_created(Config, Opts) ->
 
 set_no_buffer_workers(Opts) ->
     Opts#{spawn_buffer_workers => false}.
+
+redact(Conf, _Type) ->
+    emqx_utils:redact(Conf).

+ 32 - 14
apps/emqx_ldap/src/emqx_ldap.erl

@@ -39,11 +39,12 @@
 
 -export([namespace/0, roots/0, fields/1, desc/1]).
 
--export([do_get_status/1]).
+-export([do_get_status/1, get_status_with_poolname/1]).
 
 -define(LDAP_HOST_OPTIONS, #{
     default_port => 389
 }).
+-define(REDACT_VAL, "******").
 
 -type params_tokens() :: #{atom() => list()}.
 -type state() ::
@@ -154,18 +155,19 @@ on_start(
             false ->
                 Config2
         end,
+
     Options = [
         {pool_size, PoolSize},
         {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
         {options, Config3}
     ],
 
-    case emqx_resource_pool:start(InstId, ?MODULE, Options) of
+    case emqx_resource_pool:start(InstId, ?MODULE, [{log_tag, "eldap_info"} | Options]) of
         ok ->
             emqx_ldap_bind_worker:on_start(
                 InstId,
                 Config,
-                Options,
+                [{log_tag, "eldap_bind_info"} | Options],
                 prepare_template(Config, #{pool_name => InstId})
             );
         {error, Reason} ->
@@ -193,7 +195,15 @@ on_query(InstId, {query, Data, Attrs, Timeout}, State) ->
 on_query(InstId, {bind, _DN, _Data} = Req, State) ->
     emqx_ldap_bind_worker:on_query(InstId, Req, State).
 
-on_get_status(_InstId, #{pool_name := PoolName} = _State) ->
+on_get_status(InstId, #{pool_name := PoolName} = State) ->
+    case get_status_with_poolname(PoolName) of
+        connected ->
+            emqx_ldap_bind_worker:on_get_status(InstId, State);
+        disconnected ->
+            disconnected
+    end.
+
+get_status_with_poolname(PoolName) ->
     case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
         true ->
             connected;
@@ -209,7 +219,7 @@ do_get_status(Conn) ->
     %% if the server is down, the result is {error, ldap_closed}
     %% otherwise is {error, invalidDNSyntax/timeout}
     {error, ldap_closed} =/=
-        eldap:search(Conn, [{base, "checkalive"}, {filter, eldap:'approxMatch'("", "")}]).
+        eldap:search(Conn, [{base, "cn=checkalive"}, {filter, eldap:'approxMatch'("", "")}]).
 
 %% ===================================================================
 
@@ -222,7 +232,8 @@ connect(Options) ->
     } =
         Conf = proplists:get_value(options, Options),
     OpenOpts = maps:to_list(maps:with([port, sslopts], Conf)),
-    case eldap:open([Host], [{log, fun log/3}, {timeout, RequestTimeout} | OpenOpts]) of
+    LogTag = proplists:get_value(log_tag, Options),
+    case eldap:open([Host], [{log, mk_log_func(LogTag)}, {timeout, RequestTimeout} | OpenOpts]) of
         {ok, Handle} = Ret ->
             %% TODO: teach `eldap` to accept 0-arity closures as passwords.
             case eldap:simple_bind(Handle, Username, emqx_secret:unwrap(Password)) of
@@ -313,14 +324,21 @@ do_ldap_query(
     end.
 
 %% Note: the value of the `_Level` here always is 2
-log(_Level, Format, Args) ->
-    ?SLOG(
-        info,
-        #{
-            msg => "eldap_info",
-            log => io_lib:format(Format, Args)
-        }
-    ).
+mk_log_func(LogTag) ->
+    fun(_Level, Format, Args) ->
+        ?SLOG(
+            info,
+            #{
+                msg => LogTag,
+                log => io_lib:format(Format, [redact_ldap_log(Arg) || Arg <- Args])
+            }
+        )
+    end.
+
+redact_ldap_log({'BindRequest', Version, Name, {simple, _}}) ->
+    {'BindRequest', Version, Name, {simple, ?REDACT_VAL}};
+redact_ldap_log(Arg) ->
+    Arg.
 
 prepare_template(Config, State) ->
     maps:fold(fun prepare_template/3, State, Config).

+ 7 - 1
apps/emqx_ldap/src/emqx_ldap_bind_worker.erl

@@ -24,7 +24,8 @@
 -export([
     on_start/4,
     on_stop/2,
-    on_query/3
+    on_query/3,
+    on_get_status/2
 ]).
 
 %% ecpool connect & reconnect
@@ -103,6 +104,11 @@ on_query(
             {error, {unrecoverable_error, Reason}}
     end.
 
+on_get_status(_InstId, #{bind_pool_name := PoolName}) ->
+    emqx_ldap:get_status_with_poolname(PoolName);
+on_get_status(_InstId, _) ->
+    connected.
+
 %% ===================================================================
 
 connect(Conf) ->

+ 28 - 12
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -774,6 +774,12 @@ subscribe(#{clientid := ClientID, topic := Topic} = Sub) ->
     case do_subscribe(ClientID, Topic, Opts) of
         {error, channel_not_found} ->
             {404, ?CLIENTID_NOT_FOUND};
+        {error, invalid_subopts_nl} ->
+            {400, #{
+                code => <<"INVALID_PARAMETER">>,
+                message =>
+                    <<"Invalid Subscribe options: `no_local` not allowed for shared-sub. See [MQTT-3.8.3-4]">>
+            }};
         {error, Reason} ->
             Message = list_to_binary(io_lib:format("~p", [Reason])),
             {500, #{code => <<"UNKNOWN_ERROR">>, message => Message}};
@@ -797,10 +803,13 @@ subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
     end.
 
 unsubscribe(#{clientid := ClientID, topic := Topic}) ->
+    {NTopic, _} = emqx_topic:parse(Topic),
     case do_unsubscribe(ClientID, Topic) of
         {error, channel_not_found} ->
             {404, ?CLIENTID_NOT_FOUND};
-        {unsubscribe, [{Topic, #{}}]} ->
+        {unsubscribe, [{UnSubedT, #{}}]} when
+            (UnSubedT =:= NTopic) orelse (UnSubedT =:= Topic)
+        ->
             {204}
     end.
 
@@ -817,18 +826,25 @@ unsubscribe_batch(#{clientid := ClientID, topics := Topics}) ->
 %% internal function
 
 do_subscribe(ClientID, Topic0, Options) ->
-    {Topic, Opts} = emqx_topic:parse(Topic0, Options),
-    TopicTable = [{Topic, Opts}],
-    case emqx_mgmt:subscribe(ClientID, TopicTable) of
-        {error, Reason} ->
-            {error, Reason};
-        {subscribe, Subscriptions, Node} ->
-            case proplists:is_defined(Topic, Subscriptions) of
-                true ->
-                    {ok, Options#{node => Node, clientid => ClientID, topic => Topic}};
-                false ->
-                    {error, unknow_error}
+    try emqx_topic:parse(Topic0, Options) of
+        {Topic, Opts} ->
+            TopicTable = [{Topic, Opts}],
+            case emqx_mgmt:subscribe(ClientID, TopicTable) of
+                {error, Reason} ->
+                    {error, Reason};
+                {subscribe, Subscriptions, Node} ->
+                    case proplists:is_defined(Topic, Subscriptions) of
+                        true ->
+                            {ok, Options#{node => Node, clientid => ClientID, topic => Topic0}};
+                        false ->
+                            {error, unknow_error}
+                    end
             end
+    catch
+        error:{invalid_subopts_nl, _} ->
+            {error, invalid_subopts_nl};
+        _:Reason ->
+            {error, Reason}
     end.
 
 -spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->

+ 129 - 0
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -17,9 +17,12 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/emqx_router.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/asserts.hrl").
 
 all() ->
     AllTCs = emqx_common_test_helpers:all(?MODULE),
@@ -758,6 +761,132 @@ t_client_id_not_found(_Config) ->
         {error, {Http, _, Body}}, PostFun(post, PathFun(["unsubscribe", "bulk"]), [UnsubBody])
     ).
 
+t_subscribe_shared_topic(_Config) ->
+    ClientId = <<"client_subscribe_shared">>,
+
+    {ok, C} = emqtt:start_link(#{clientid => ClientId}),
+    {ok, _} = emqtt:connect(C),
+
+    ClientPuber = <<"publish_client">>,
+    {ok, PC} = emqtt:start_link(#{clientid => ClientPuber}),
+    {ok, _} = emqtt:connect(PC),
+
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+
+    Http200 = {"HTTP/1.1", 200, "OK"},
+    Http204 = {"HTTP/1.1", 204, "No Content"},
+
+    PathFun = fun(Suffix) ->
+        emqx_mgmt_api_test_util:api_path(["clients", "client_subscribe_shared"] ++ Suffix)
+    end,
+
+    PostFun = fun(Method, Path, Data) ->
+        emqx_mgmt_api_test_util:request_api(
+            Method, Path, "", AuthHeader, Data, #{return_all => true}
+        )
+    end,
+
+    SharedT = <<"$share/group/testtopic">>,
+    NonSharedT = <<"t/#">>,
+
+    SubBodyFun = fun(T) -> #{topic => T, qos => 1, nl => 0, rh => 1} end,
+    UnSubBodyFun = fun(T) -> #{topic => T} end,
+
+    %% ====================
+    %% Client Subscribe
+    ?assertMatch(
+        {ok, {Http200, _, _}},
+        PostFun(post, PathFun(["subscribe"]), SubBodyFun(SharedT))
+    ),
+    ?assertMatch(
+        {ok, {Http200, _, _}},
+        PostFun(
+            post,
+            PathFun(["subscribe", "bulk"]),
+            [SubBodyFun(T) || T <- [SharedT, NonSharedT]]
+        )
+    ),
+
+    %% assert subscription
+    ?assertMatch(
+        [
+            {_, #share{group = <<"group">>, topic = <<"testtopic">>}},
+            {_, <<"t/#">>}
+        ],
+        ets:tab2list(?SUBSCRIPTION)
+    ),
+
+    ?assertMatch(
+        [
+            {{#share{group = <<"group">>, topic = <<"testtopic">>}, _}, #{
+                nl := 0, qos := 1, rh := 1, rap := 0
+            }},
+            {{<<"t/#">>, _}, #{nl := 0, qos := 1, rh := 1, rap := 0}}
+        ],
+        ets:tab2list(?SUBOPTION)
+    ),
+    ?assertMatch(
+        [{emqx_shared_subscription, <<"group">>, <<"testtopic">>, _}],
+        ets:tab2list(emqx_shared_subscription)
+    ),
+
+    %% assert subscription virtual
+    _ = emqtt:publish(PC, <<"testtopic">>, <<"msg1">>, [{qos, 0}]),
+    ?assertReceive({publish, #{topic := <<"testtopic">>, payload := <<"msg1">>}}),
+    _ = emqtt:publish(PC, <<"t/1">>, <<"msg2">>, [{qos, 0}]),
+    ?assertReceive({publish, #{topic := <<"t/1">>, payload := <<"msg2">>}}),
+
+    %% ====================
+    %% Client Unsubscribe
+    ?assertMatch(
+        {ok, {Http204, _, _}},
+        PostFun(post, PathFun(["unsubscribe"]), UnSubBodyFun(SharedT))
+    ),
+    ?assertMatch(
+        {ok, {Http204, _, _}},
+        PostFun(
+            post,
+            PathFun(["unsubscribe", "bulk"]),
+            [UnSubBodyFun(T) || T <- [SharedT, NonSharedT]]
+        )
+    ),
+
+    %% assert subscription
+    ?assertEqual([], ets:tab2list(?SUBSCRIPTION)),
+    ?assertEqual([], ets:tab2list(?SUBOPTION)),
+    ?assertEqual([], ets:tab2list(emqx_shared_subscription)),
+
+    %% assert subscription virtual
+    _ = emqtt:publish(PC, <<"testtopic">>, <<"msg3">>, [{qos, 0}]),
+    _ = emqtt:publish(PC, <<"t/1">>, <<"msg4">>, [{qos, 0}]),
+    ?assertNotReceive({publish, #{topic := <<"testtopic">>, payload := <<"msg3">>}}),
+    ?assertNotReceive({publish, #{topic := <<"t/1">>, payload := <<"msg4">>}}).
+
+t_subscribe_shared_topic_nl(_Config) ->
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Http400 = {"HTTP/1.1", 400, "Bad Request"},
+    Body =
+        "{\"code\":\"INVALID_PARAMETER\","
+        "\"message\":\"Invalid Subscribe options: `no_local` not allowed for shared-sub. See [MQTT-3.8.3-4]\"}",
+    ClientId = <<"client_subscribe_shared">>,
+
+    {ok, C} = emqtt:start_link(#{clientid => ClientId}),
+    {ok, _} = emqtt:connect(C),
+
+    PathFun = fun(Suffix) ->
+        emqx_mgmt_api_test_util:api_path(["clients", "client_subscribe_shared"] ++ Suffix)
+    end,
+    PostFun = fun(Method, Path, Data) ->
+        emqx_mgmt_api_test_util:request_api(
+            Method, Path, "", AuthHeader, Data, #{return_all => true}
+        )
+    end,
+    T = <<"$share/group/testtopic">>,
+    ?assertMatch(
+        {error, {Http400, _, Body}},
+        PostFun(post, PathFun(["subscribe"]), #{topic => T, qos => 1, nl => 1, rh => 1})
+    ).
+
 time_string_to_epoch_millisecond(DateTime) ->
     time_string_to_epoch(DateTime, millisecond).
 

+ 48 - 31
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -834,8 +834,7 @@ cert_data(undefined) ->
 cert_data(AllListeners) ->
     Points = lists:foldl(
         fun(ListenerType, PointsAcc) ->
-            PointsAcc ++
-                points_of_listeners(ListenerType, AllListeners)
+            lists:append(PointsAcc, points_of_listeners(ListenerType, AllListeners))
         end,
         _PointsInitAcc = [],
         ?LISTENER_TYPES
@@ -847,53 +846,71 @@ cert_data(AllListeners) ->
 points_of_listeners(Type, AllListeners) ->
     do_points_of_listeners(Type, maps:get(Type, AllListeners, undefined)).
 
--define(CERT_TYPES, [certfile]).
-
--spec do_points_of_listeners(Type, TypeOfListeners) ->
+-spec do_points_of_listeners(Type, Listeners) ->
     [_Point :: {[{LabelKey, LabelValue}], Epoch}]
 when
     Type :: ssl | wss | quic,
-    TypeOfListeners :: #{ListenerName :: atom() => ListenerConf :: map()} | undefined,
+    Listeners :: #{ListenerName :: atom() => ListenerConf :: map()} | undefined,
     LabelKey :: atom(),
     LabelValue :: atom(),
     Epoch :: non_neg_integer().
 do_points_of_listeners(_, undefined) ->
     [];
-do_points_of_listeners(ListenerType, TypeOfListeners) ->
+do_points_of_listeners(Type, Listeners) ->
     lists:foldl(
         fun(Name, PointsAcc) ->
-            lists:foldl(
-                fun(CertType, AccIn) ->
-                    case
-                        emqx_utils_maps:deep_get(
-                            [Name, ssl_options, CertType], TypeOfListeners, undefined
-                        )
-                    of
-                        undefined -> AccIn;
-                        Path -> [gen_point(ListenerType, Name, Path) | AccIn]
-                    end
-                end,
-                [],
-                ?CERT_TYPES
-            ) ++ PointsAcc
+            case
+                emqx_utils_maps:deep_get([Name, enable], Listeners, false) andalso
+                    emqx_utils_maps:deep_get(
+                        [Name, ssl_options, certfile], Listeners, undefined
+                    )
+            of
+                false -> PointsAcc;
+                undefined -> PointsAcc;
+                Path -> [gen_point_cert_expiry_at(Type, Name, Path) | PointsAcc]
+            end
         end,
         [],
-        maps:keys(TypeOfListeners)
+        %% listener names
+        maps:keys(Listeners)
     ).
 
-gen_point(Type, Name, Path) ->
+gen_point_cert_expiry_at(Type, Name, Path) ->
     {[{listener_type, Type}, {listener_name, Name}], cert_expiry_at_from_path(Path)}.
 
 %% TODO: cert manager for more generic utils functions
 cert_expiry_at_from_path(Path0) ->
     Path = emqx_schema:naive_env_interpolation(Path0),
-    {ok, PemBin} = file:read_file(Path),
-    [CertEntry | _] = public_key:pem_decode(PemBin),
-    Cert = public_key:pem_entry_decode(CertEntry),
-    %% TODO: Not fully tested for all certs type
-    {'utcTime', NotAfterUtc} =
-        Cert#'Certificate'.'tbsCertificate'#'TBSCertificate'.validity#'Validity'.'notAfter',
-    utc_time_to_epoch(NotAfterUtc).
+    try
+        case file:read_file(Path) of
+            {ok, PemBin} ->
+                [CertEntry | _] = public_key:pem_decode(PemBin),
+                Cert = public_key:pem_entry_decode(CertEntry),
+                %% TODO: Not fully tested for all certs type
+                {'utcTime', NotAfterUtc} =
+                    Cert#'Certificate'.'tbsCertificate'#'TBSCertificate'.validity#'Validity'.'notAfter',
+                utc_time_to_epoch(NotAfterUtc);
+            {error, Reason} ->
+                ?SLOG(error, #{
+                    msg => "read_cert_file_failed",
+                    path => Path0,
+                    resolved_path => Path,
+                    reason => Reason
+                }),
+                0
+        end
+    catch
+        E:R:S ->
+            ?SLOG(error, #{
+                msg => "obtain_cert_expiry_time_failed",
+                error => E,
+                reason => R,
+                stacktrace => S,
+                path => Path0,
+                resolved_path => Path
+            }),
+            0
+    end.
 
 utc_time_to_epoch(UtcTime) ->
     date_to_expiry_epoch(utc_time_to_datetime(UtcTime)).
@@ -902,7 +919,7 @@ utc_time_to_datetime(Str) ->
     {ok, [Year, Month, Day, Hour, Minute, Second], _} = io_lib:fread(
         "~2d~2d~2d~2d~2d~2dZ", Str
     ),
-    %% Alwoys Assuming YY is in 2000
+    %% Always Assuming YY is in 2000
     {{2000 + Year, Month, Day}, {Hour, Minute, Second}}.
 
 %% 62167219200 =:= calendar:datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}).

+ 22 - 0
apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl

@@ -1145,6 +1145,7 @@ t_parse_date_errors(_) ->
     ),
 
     %% Compatibility test
+    %% UTC+0
     UnixTs = 1653561612,
     ?assertEqual(
         UnixTs,
@@ -1159,6 +1160,27 @@ t_parse_date_errors(_) ->
     ?assertEqual(
         UnixTs,
         emqx_rule_funcs:date_to_unix_ts(second, <<"%Y-%m-%d %H:%M:%S">>, <<"2022-05-26 10-40-12">>)
+    ),
+
+    %% UTC+0
+    UnixTsLeap0 = 1582986700,
+    ?assertEqual(
+        UnixTsLeap0,
+        emqx_rule_funcs:date_to_unix_ts(second, <<"%Y-%m-%d %H:%M:%S">>, <<"2020-02-29 14:31:40">>)
+    ),
+
+    %% UTC+0
+    UnixTsLeap1 = 1709297071,
+    ?assertEqual(
+        UnixTsLeap1,
+        emqx_rule_funcs:date_to_unix_ts(second, <<"%Y-%m-%d %H:%M:%S">>, <<"2024-03-01 12:44:31">>)
+    ),
+
+    %% UTC+0
+    UnixTsLeap2 = 1709535387,
+    ?assertEqual(
+        UnixTsLeap2,
+        emqx_rule_funcs:date_to_unix_ts(second, <<"%Y-%m-%d %H:%M:%S">>, <<"2024-03-04 06:56:27">>)
     ).
 
 %%------------------------------------------------------------------------------

+ 5 - 237
apps/emqx_utils/src/emqx_utils.erl

@@ -659,146 +659,20 @@ try_to_existing_atom(Convert, Data, Encoding) ->
         _:Reason -> {error, Reason}
     end.
 
-%% NOTE: keep alphabetical order
-is_sensitive_key(aws_secret_access_key) -> true;
-is_sensitive_key("aws_secret_access_key") -> true;
-is_sensitive_key(<<"aws_secret_access_key">>) -> true;
-is_sensitive_key(password) -> true;
-is_sensitive_key("password") -> true;
-is_sensitive_key(<<"password">>) -> true;
-is_sensitive_key('proxy-authorization') -> true;
-is_sensitive_key("proxy-authorization") -> true;
-is_sensitive_key(<<"proxy-authorization">>) -> true;
-is_sensitive_key(secret) -> true;
-is_sensitive_key("secret") -> true;
-is_sensitive_key(<<"secret">>) -> true;
-is_sensitive_key(secret_access_key) -> true;
-is_sensitive_key("secret_access_key") -> true;
-is_sensitive_key(<<"secret_access_key">>) -> true;
-is_sensitive_key(secret_key) -> true;
-is_sensitive_key("secret_key") -> true;
-is_sensitive_key(<<"secret_key">>) -> true;
-is_sensitive_key(security_token) -> true;
-is_sensitive_key("security_token") -> true;
-is_sensitive_key(<<"security_token">>) -> true;
-is_sensitive_key(sp_private_key) -> true;
-is_sensitive_key(<<"sp_private_key">>) -> true;
-is_sensitive_key(token) -> true;
-is_sensitive_key("token") -> true;
-is_sensitive_key(<<"token">>) -> true;
-is_sensitive_key(jwt) -> true;
-is_sensitive_key("jwt") -> true;
-is_sensitive_key(<<"jwt">>) -> true;
-is_sensitive_key(authorization) -> true;
-is_sensitive_key("authorization") -> true;
-is_sensitive_key(<<"authorization">>) -> true;
-is_sensitive_key(bind_password) -> true;
-is_sensitive_key("bind_password") -> true;
-is_sensitive_key(<<"bind_password">>) -> true;
-is_sensitive_key(Key) -> is_authorization(Key).
-
 redact(Term) ->
-    do_redact(Term, fun is_sensitive_key/1).
+    emqx_utils_redact:redact(Term).
 
 redact(Term, Checker) ->
-    do_redact(Term, fun(V) ->
-        is_sensitive_key(V) orelse Checker(V)
-    end).
-
-do_redact([E | Rest], Checker) ->
-    [do_redact(E, Checker) | do_redact(Rest, Checker)];
-do_redact(M, Checker) when is_map(M) ->
-    maps:map(
-        fun(K, V) ->
-            do_redact(K, V, Checker)
-        end,
-        M
-    );
-do_redact({Key, Value}, Checker) ->
-    case Checker(Key) of
-        true ->
-            {Key, redact_v(Value)};
-        false ->
-            {do_redact(Key, Checker), do_redact(Value, Checker)}
-    end;
-do_redact(T, Checker) when is_tuple(T) ->
-    Elements = erlang:tuple_to_list(T),
-    Redact = do_redact(Elements, Checker),
-    erlang:list_to_tuple(Redact);
-do_redact(Any, _Checker) ->
-    Any.
-
-do_redact(K, V, Checker) ->
-    case Checker(K) of
-        true ->
-            redact_v(V);
-        false ->
-            do_redact(V, Checker)
-    end.
-
--define(REDACT_VAL, "******").
-redact_v(V) when is_binary(V) -> <<?REDACT_VAL>>;
-%% The HOCON schema system may generate sensitive values with this format
-redact_v([{str, Bin}]) when is_binary(Bin) ->
-    [{str, <<?REDACT_VAL>>}];
-redact_v(_V) ->
-    ?REDACT_VAL.
+    emqx_utils_redact:redact(Term, Checker).
 
 deobfuscate(NewConf, OldConf) ->
-    maps:fold(
-        fun(K, V, Acc) ->
-            case maps:find(K, OldConf) of
-                error ->
-                    case is_redacted(K, V) of
-                        %% don't put redacted value into new config
-                        true -> Acc;
-                        false -> Acc#{K => V}
-                    end;
-                {ok, OldV} when is_map(V), is_map(OldV) ->
-                    Acc#{K => deobfuscate(V, OldV)};
-                {ok, OldV} ->
-                    case is_redacted(K, V) of
-                        true ->
-                            Acc#{K => OldV};
-                        _ ->
-                            Acc#{K => V}
-                    end
-            end
-        end,
-        #{},
-        NewConf
-    ).
+    emqx_utils_redact:deobfuscate(NewConf, OldConf).
 
 is_redacted(K, V) ->
-    do_is_redacted(K, V, fun is_sensitive_key/1).
+    emqx_utils_redact:is_redacted(K, V).
 
 is_redacted(K, V, Fun) ->
-    do_is_redacted(K, V, fun(E) ->
-        is_sensitive_key(E) orelse Fun(E)
-    end).
-
-do_is_redacted(K, ?REDACT_VAL, Fun) ->
-    Fun(K);
-do_is_redacted(K, <<?REDACT_VAL>>, Fun) ->
-    Fun(K);
-do_is_redacted(K, WrappedFun, Fun) when is_function(WrappedFun, 0) ->
-    %% wrapped by `emqx_secret' or other module
-    do_is_redacted(K, WrappedFun(), Fun);
-do_is_redacted(_K, _V, _Fun) ->
-    false.
-
-%% This is ugly, however, the authorization is case-insensitive,
-%% the best way is to check chars one by one and quickly exit when any position is not equal,
-%% but in Erlang, this may not perform well, so here only check the first one
-is_authorization([Cap | _] = Key) when Cap == $a; Cap == $A ->
-    is_authorization2(Key);
-is_authorization(<<Cap, _/binary>> = Key) when Cap == $a; Cap == $A ->
-    is_authorization2(erlang:binary_to_list(Key));
-is_authorization(_Any) ->
-    false.
-
-is_authorization2(Str) ->
-    "authorization" == string:to_lower(Str).
+    emqx_utils_redact:is_redacted(K, V, Fun).
 
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
@@ -812,112 +686,6 @@ ipv6_probe_test() ->
             ok
     end.
 
-redact_test_() ->
-    Case = fun(TypeF, KeyIn) ->
-        Key = TypeF(KeyIn),
-        ?assert(is_sensitive_key(Key)),
-
-        %% direct
-        ?assertEqual({Key, ?REDACT_VAL}, redact({Key, foo})),
-        ?assertEqual(#{Key => ?REDACT_VAL}, redact(#{Key => foo})),
-        ?assertEqual({Key, Key, Key}, redact({Key, Key, Key})),
-        ?assertEqual({[{Key, ?REDACT_VAL}], bar}, redact({[{Key, foo}], bar})),
-
-        %% 1 level nested
-        ?assertEqual([{Key, ?REDACT_VAL}], redact([{Key, foo}])),
-        ?assertEqual([#{Key => ?REDACT_VAL}], redact([#{Key => foo}])),
-
-        %% 2 level nested
-        ?assertEqual(#{opts => [{Key, ?REDACT_VAL}]}, redact(#{opts => [{Key, foo}]})),
-        ?assertEqual(#{opts => #{Key => ?REDACT_VAL}}, redact(#{opts => #{Key => foo}})),
-        ?assertEqual({opts, [{Key, ?REDACT_VAL}]}, redact({opts, [{Key, foo}]})),
-
-        %% 3 level nested
-        ?assertEqual([#{opts => [{Key, ?REDACT_VAL}]}], redact([#{opts => [{Key, foo}]}])),
-        ?assertEqual([{opts, [{Key, ?REDACT_VAL}]}], redact([{opts, [{Key, foo}]}])),
-        ?assertEqual([{opts, [#{Key => ?REDACT_VAL}]}], redact([{opts, [#{Key => foo}]}])),
-
-        %% improper lists
-        ?assertEqual([{opts, [{Key, ?REDACT_VAL} | oops]}], redact([{opts, [{Key, foo} | oops]}]))
-    end,
-    Types = [
-        {atom, fun identity/1},
-        {string, fun emqx_utils_conv:str/1},
-        {binary, fun emqx_utils_conv:bin/1}
-    ],
-    Keys = [
-        authorization,
-        aws_secret_access_key,
-        password,
-        'proxy-authorization',
-        secret,
-        secret_key,
-        secret_access_key,
-        security_token,
-        token,
-        bind_password
-    ],
-    [
-        {case_name(Type, Key), fun() -> Case(TypeF, Key) end}
-     || Key <- Keys,
-        {Type, TypeF} <- Types
-    ].
-
-redact2_test_() ->
-    Case = fun(Key, Checker) ->
-        ?assertEqual({Key, ?REDACT_VAL}, redact({Key, foo}, Checker)),
-        ?assertEqual(#{Key => ?REDACT_VAL}, redact(#{Key => foo}, Checker)),
-        ?assertEqual({Key, Key, Key}, redact({Key, Key, Key}, Checker)),
-        ?assertEqual({[{Key, ?REDACT_VAL}], bar}, redact({[{Key, foo}], bar}, Checker))
-    end,
-
-    Checker = fun(E) -> E =:= passcode end,
-
-    Keys = [secret, passcode],
-    [{case_name(atom, Key), fun() -> Case(Key, Checker) end} || Key <- Keys].
-
-deobfuscate_test() ->
-    NewConf0 = #{foo => <<"bar0">>, password => <<"123456">>},
-    ?assertEqual(NewConf0, deobfuscate(NewConf0, #{foo => <<"bar">>, password => <<"654321">>})),
-
-    NewConf1 = #{foo => <<"bar1">>, password => <<?REDACT_VAL>>},
-    ?assertEqual(
-        #{foo => <<"bar1">>, password => <<"654321">>},
-        deobfuscate(NewConf1, #{foo => <<"bar">>, password => <<"654321">>})
-    ),
-
-    %% Don't have password before and ignore to put redact_val into new config
-    NewConf2 = #{foo => <<"bar2">>, password => ?REDACT_VAL},
-    ?assertEqual(#{foo => <<"bar2">>}, deobfuscate(NewConf2, #{foo => <<"bar">>})),
-
-    %% Don't have password before and should allow put non-redact-val into new config
-    NewConf3 = #{foo => <<"bar3">>, password => <<"123456">>},
-    ?assertEqual(NewConf3, deobfuscate(NewConf3, #{foo => <<"bar">>})),
-    ok.
-
-redact_is_authorization_test_() ->
-    Types = [string, binary],
-    Keys = ["auThorization", "Authorization", "authorizaTion"],
-
-    Case = fun(Type, Key0) ->
-        Key =
-            case Type of
-                binary ->
-                    erlang:list_to_binary(Key0);
-                _ ->
-                    Key0
-            end,
-        ?assert(is_sensitive_key(Key))
-    end,
-
-    [{case_name(Type, Key), fun() -> Case(Type, Key) end} || Key <- Keys, Type <- Types].
-
-case_name(Type, Key) ->
-    lists:concat([Type, "-", Key]).
-
-identity(X) ->
-    X.
-
 -endif.
 
 pub_props_to_packet(Properties) ->

+ 10 - 1
apps/emqx_utils/src/emqx_utils_calendar.erl

@@ -478,7 +478,8 @@ do_parse(DateStr, Unit, Formatter) ->
             (year, V, Res) ->
                 Res + dy(V) * ?SECONDS_PER_DAY * Precise - (?SECONDS_FROM_0_TO_1970 * Precise);
             (month, V, Res) ->
-                Res + dm(V) * ?SECONDS_PER_DAY * Precise;
+                Dm = dym(maps:get(year, DateInfo, 0), V),
+                Res + Dm * ?SECONDS_PER_DAY * Precise;
             (day, V, Res) ->
                 Res + (V * ?SECONDS_PER_DAY * Precise);
             (hour, V, Res) ->
@@ -563,6 +564,14 @@ date_size(timezone) -> 5;
 date_size(timezone1) -> 6;
 date_size(timezone2) -> 9.
 
+dym(Y, M) ->
+    case is_leap_year(Y) of
+        true when M > 2 ->
+            dm(M) + 1;
+        _ ->
+            dm(M)
+    end.
+
 dm(1) -> 0;
 dm(2) -> 31;
 dm(3) -> 59;

+ 312 - 0
apps/emqx_utils/src/emqx_utils_redact.erl

@@ -0,0 +1,312 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_utils_redact).
+
+-export([redact/1, redact/2, is_redacted/2, is_redacted/3]).
+-export([deobfuscate/2]).
+
+-define(REDACT_VAL, "******").
+-define(IS_KEY_HEADERS(K), K == headers; K == <<"headers">>; K == "headers").
+
+%% NOTE: keep alphabetical order
+is_sensitive_key(aws_secret_access_key) -> true;
+is_sensitive_key("aws_secret_access_key") -> true;
+is_sensitive_key(<<"aws_secret_access_key">>) -> true;
+is_sensitive_key(password) -> true;
+is_sensitive_key("password") -> true;
+is_sensitive_key(<<"password">>) -> true;
+is_sensitive_key(secret) -> true;
+is_sensitive_key("secret") -> true;
+is_sensitive_key(<<"secret">>) -> true;
+is_sensitive_key(secret_access_key) -> true;
+is_sensitive_key("secret_access_key") -> true;
+is_sensitive_key(<<"secret_access_key">>) -> true;
+is_sensitive_key(secret_key) -> true;
+is_sensitive_key("secret_key") -> true;
+is_sensitive_key(<<"secret_key">>) -> true;
+is_sensitive_key(security_token) -> true;
+is_sensitive_key("security_token") -> true;
+is_sensitive_key(<<"security_token">>) -> true;
+is_sensitive_key(sp_private_key) -> true;
+is_sensitive_key(<<"sp_private_key">>) -> true;
+is_sensitive_key(token) -> true;
+is_sensitive_key("token") -> true;
+is_sensitive_key(<<"token">>) -> true;
+is_sensitive_key(jwt) -> true;
+is_sensitive_key("jwt") -> true;
+is_sensitive_key(<<"jwt">>) -> true;
+is_sensitive_key(bind_password) -> true;
+is_sensitive_key("bind_password") -> true;
+is_sensitive_key(<<"bind_password">>) -> true;
+is_sensitive_key(_) -> false.
+
+redact(Term) ->
+    do_redact(Term, fun is_sensitive_key/1).
+
+redact(Term, Checker) ->
+    do_redact(Term, fun(V) ->
+        is_sensitive_key(V) orelse Checker(V)
+    end).
+
+do_redact(L, Checker) when is_list(L) ->
+    lists:map(fun(E) -> do_redact(E, Checker) end, L);
+do_redact(M, Checker) when is_map(M) ->
+    maps:map(
+        fun(K, V) ->
+            do_redact(K, V, Checker)
+        end,
+        M
+    );
+do_redact({Headers, Value}, _Checker) when ?IS_KEY_HEADERS(Headers) ->
+    {Headers, do_redact_headers(Value)};
+do_redact({Key, Value}, Checker) ->
+    case Checker(Key) of
+        true ->
+            {Key, redact_v(Value)};
+        false ->
+            {do_redact(Key, Checker), do_redact(Value, Checker)}
+    end;
+do_redact(T, Checker) when is_tuple(T) ->
+    Elements = erlang:tuple_to_list(T),
+    Redact = do_redact(Elements, Checker),
+    erlang:list_to_tuple(Redact);
+do_redact(Any, _Checker) ->
+    Any.
+
+do_redact(Headers, V, _Checker) when ?IS_KEY_HEADERS(Headers) ->
+    do_redact_headers(V);
+do_redact(K, V, Checker) ->
+    case Checker(K) of
+        true ->
+            redact_v(V);
+        false ->
+            do_redact(V, Checker)
+    end.
+
+do_redact_headers(List) when is_list(List) ->
+    lists:map(
+        fun
+            ({K, V} = Pair) ->
+                case check_is_sensitive_header(K) of
+                    true ->
+                        {K, redact_v(V)};
+                    _ ->
+                        Pair
+                end;
+            (Any) ->
+                Any
+        end,
+        List
+    );
+do_redact_headers(Map) when is_map(Map) ->
+    maps:map(
+        fun(K, V) ->
+            case check_is_sensitive_header(K) of
+                true ->
+                    redact_v(V);
+                _ ->
+                    V
+            end
+        end,
+        Map
+    );
+do_redact_headers(Value) ->
+    Value.
+
+check_is_sensitive_header(Key) ->
+    Key1 = emqx_utils_conv:str(Key),
+    is_sensitive_header(string:lowercase(Key1)).
+
+is_sensitive_header("authorization") ->
+    true;
+is_sensitive_header("proxy-authorization") ->
+    true;
+is_sensitive_header(_Any) ->
+    false.
+
+redact_v(V) when is_binary(V) -> <<?REDACT_VAL>>;
+%% The HOCON schema system may generate sensitive values with this format
+redact_v([{str, Bin}]) when is_binary(Bin) ->
+    [{str, <<?REDACT_VAL>>}];
+redact_v(_V) ->
+    ?REDACT_VAL.
+
+deobfuscate(NewConf, OldConf) ->
+    maps:fold(
+        fun(K, V, Acc) ->
+            case maps:find(K, OldConf) of
+                error ->
+                    case is_redacted(K, V) of
+                        %% don't put redacted value into new config
+                        true -> Acc;
+                        false -> Acc#{K => V}
+                    end;
+                {ok, OldV} when is_map(V), is_map(OldV) ->
+                    Acc#{K => deobfuscate(V, OldV)};
+                {ok, OldV} ->
+                    case is_redacted(K, V) of
+                        true ->
+                            Acc#{K => OldV};
+                        _ ->
+                            Acc#{K => V}
+                    end
+            end
+        end,
+        #{},
+        NewConf
+    ).
+
+is_redacted(K, V) ->
+    do_is_redacted(K, V, fun is_sensitive_key/1).
+
+is_redacted(K, V, Fun) ->
+    do_is_redacted(K, V, fun(E) ->
+        is_sensitive_key(E) orelse Fun(E)
+    end).
+
+do_is_redacted(K, ?REDACT_VAL, Fun) ->
+    Fun(K);
+do_is_redacted(K, <<?REDACT_VAL>>, Fun) ->
+    Fun(K);
+do_is_redacted(K, WrappedFun, Fun) when is_function(WrappedFun, 0) ->
+    %% wrapped by `emqx_secret' or other module
+    do_is_redacted(K, WrappedFun(), Fun);
+do_is_redacted(_K, _V, _Fun) ->
+    false.
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+redact_test_() ->
+    Case = fun(Type, KeyT) ->
+        Key =
+            case Type of
+                atom -> KeyT;
+                string -> erlang:atom_to_list(KeyT);
+                binary -> erlang:atom_to_binary(KeyT)
+            end,
+
+        ?assert(is_sensitive_key(Key)),
+
+        %% direct
+        ?assertEqual({Key, ?REDACT_VAL}, redact({Key, foo})),
+        ?assertEqual(#{Key => ?REDACT_VAL}, redact(#{Key => foo})),
+        ?assertEqual({Key, Key, Key}, redact({Key, Key, Key})),
+        ?assertEqual({[{Key, ?REDACT_VAL}], bar}, redact({[{Key, foo}], bar})),
+
+        %% 1 level nested
+        ?assertEqual([{Key, ?REDACT_VAL}], redact([{Key, foo}])),
+        ?assertEqual([#{Key => ?REDACT_VAL}], redact([#{Key => foo}])),
+
+        %% 2 level nested
+        ?assertEqual(#{opts => [{Key, ?REDACT_VAL}]}, redact(#{opts => [{Key, foo}]})),
+        ?assertEqual(#{opts => #{Key => ?REDACT_VAL}}, redact(#{opts => #{Key => foo}})),
+        ?assertEqual({opts, [{Key, ?REDACT_VAL}]}, redact({opts, [{Key, foo}]})),
+
+        %% 3 level nested
+        ?assertEqual([#{opts => [{Key, ?REDACT_VAL}]}], redact([#{opts => [{Key, foo}]}])),
+        ?assertEqual([{opts, [{Key, ?REDACT_VAL}]}], redact([{opts, [{Key, foo}]}])),
+        ?assertEqual([{opts, [#{Key => ?REDACT_VAL}]}], redact([{opts, [#{Key => foo}]}]))
+    end,
+
+    Types = [atom, string, binary],
+    Keys = [
+        aws_secret_access_key,
+        password,
+        secret,
+        secret_key,
+        secret_access_key,
+        security_token,
+        token,
+        bind_password
+    ],
+    [{case_name(Type, Key), fun() -> Case(Type, Key) end} || Key <- Keys, Type <- Types].
+
+redact2_test_() ->
+    Case = fun(Key, Checker) ->
+        ?assertEqual({Key, ?REDACT_VAL}, redact({Key, foo}, Checker)),
+        ?assertEqual(#{Key => ?REDACT_VAL}, redact(#{Key => foo}, Checker)),
+        ?assertEqual({Key, Key, Key}, redact({Key, Key, Key}, Checker)),
+        ?assertEqual({[{Key, ?REDACT_VAL}], bar}, redact({[{Key, foo}], bar}, Checker))
+    end,
+
+    Checker = fun(E) -> E =:= passcode end,
+
+    Keys = [secret, passcode],
+    [{case_name(atom, Key), fun() -> Case(Key, Checker) end} || Key <- Keys].
+
+deobfuscate_test() ->
+    NewConf0 = #{foo => <<"bar0">>, password => <<"123456">>},
+    ?assertEqual(NewConf0, deobfuscate(NewConf0, #{foo => <<"bar">>, password => <<"654321">>})),
+
+    NewConf1 = #{foo => <<"bar1">>, password => <<?REDACT_VAL>>},
+    ?assertEqual(
+        #{foo => <<"bar1">>, password => <<"654321">>},
+        deobfuscate(NewConf1, #{foo => <<"bar">>, password => <<"654321">>})
+    ),
+
+    %% Don't have password before and ignore to put redact_val into new config
+    NewConf2 = #{foo => <<"bar2">>, password => ?REDACT_VAL},
+    ?assertEqual(#{foo => <<"bar2">>}, deobfuscate(NewConf2, #{foo => <<"bar">>})),
+
+    %% Don't have password before and should allow put non-redact-val into new config
+    NewConf3 = #{foo => <<"bar3">>, password => <<"123456">>},
+    ?assertEqual(NewConf3, deobfuscate(NewConf3, #{foo => <<"bar">>})),
+    ok.
+
+redact_header_test_() ->
+    Types = [string, binary, atom],
+    Keys = [
+        "auThorization",
+        "Authorization",
+        "authorizaTion",
+        "proxy-authorizaTion",
+        "proXy-authoriZaTion"
+    ],
+
+    Case = fun(Type, Key0) ->
+        Converter =
+            case Type of
+                binary ->
+                    fun erlang:list_to_binary/1;
+                atom ->
+                    fun erlang:list_to_atom/1;
+                _ ->
+                    fun(Any) -> Any end
+            end,
+
+        Name = Converter("headers"),
+        Key = Converter(Key0),
+        Value = Converter("value"),
+        Value1 = redact_v(Value),
+        ?assertMatch(
+            {Name, [{Key, Value1}]},
+            redact({Name, [{Key, Value}]})
+        ),
+
+        ?assertMatch(
+            #{Name := #{Key := Value1}},
+            redact(#{Name => #{Key => Value}})
+        )
+    end,
+
+    [{case_name(Type, Key), fun() -> Case(Type, Key) end} || Key <- Keys, Type <- Types].
+
+case_name(Type, Key) ->
+    lists:concat([Type, "-", Key]).
+
+-endif.

+ 9 - 0
changes/ce/fix-12598.en.md

@@ -0,0 +1,9 @@
+Fixed an issue that unable to subscribe or unsubscribe a shared topic filter via HTTP API.
+
+Releated APIs:
+
+- `/clients/:clientid/subscribe`
+- `/clients/:clientid/subscribe/bulk`
+
+- `/clients/:clientid/unsubscribe`
+- `/clients/:clientid/unsubscribe/bulk`

+ 1 - 0
changes/ce/fix-12601.en.md

@@ -0,0 +1 @@
+Fixed that the logs of LDAP driver would never be logged, now all of them are logged with `info` level.

+ 1 - 0
changes/ce/fix-12606.en.md

@@ -0,0 +1 @@
+Fix the issue of the endpoint `/prometheus/stats` crashing when the listener's cert file is unreadable.

+ 1 - 0
changes/ce/fix-12620.en.md

@@ -0,0 +1 @@
+Fixed the sensitive headers for HTTP connector may be printed in the `debug` level log.

+ 1 - 0
changes/ce/fix-12632.en.md

@@ -0,0 +1 @@
+Fix incorrect results from rule SQL built-in function `date_to_unix_ts` after March on leap years.

+ 1 - 0
changes/ee/fix-12608.en.md

@@ -0,0 +1 @@
+Fixed a `function_clause` error for IoTDB action when there is no `payload` field in the query data.

+ 1 - 0
changes/ee/fix-12610.en.md

@@ -0,0 +1 @@
+Fixed that the connection to the LDAP connector may be disconnected after a period of time.