Просмотр исходного кода

Merge pull request #13165 from id/0531-sync-release-57

sync release 57
Ivan Dyachkov 1 год назад
Родитель
Сommit
b232784df2
31 измененных файлов с 237 добавлено и 65 удалено
  1. 3 3
      .github/workflows/release.yaml
  2. 1 1
      apps/emqx/src/emqx.app.src
  3. 32 0
      apps/emqx/src/emqx_logger_jsonfmt.erl
  4. 14 12
      apps/emqx/src/emqx_schema.erl
  5. 16 3
      apps/emqx/src/emqx_trace/emqx_trace_formatter.erl
  6. 12 0
      apps/emqx/test/emqx_schema_tests.erl
  7. 1 1
      apps/emqx_auth/src/emqx_auth.app.src
  8. 1 1
      apps/emqx_bridge_azure_event_hub/rebar.config
  9. 1 1
      apps/emqx_bridge_confluent/rebar.config
  10. 1 1
      apps/emqx_bridge_kafka/rebar.config
  11. 48 27
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
  12. 1 1
      apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src
  13. 12 1
      apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl
  14. 1 1
      apps/emqx_license/test/emqx_license_http_api_SUITE.erl
  15. 1 1
      apps/emqx_management/src/emqx_management.app.src
  16. 1 1
      apps/emqx_oracle/src/emqx_oracle.app.src
  17. 11 1
      apps/emqx_oracle/src/emqx_oracle.erl
  18. 1 1
      apps/emqx_schema_registry/src/emqx_schema_registry.app.src
  19. 15 1
      apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl
  20. 19 1
      apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl
  21. 1 0
      changes/ce/fix-13140.en.md
  22. 5 0
      changes/ee/fix-13070.en.md
  23. 6 0
      changes/ee/fix-13079.en.md
  24. 1 0
      changes/ee/fix-13130.en.md
  25. 1 0
      changes/ee/fix-13136.en.md
  26. 1 0
      changes/ee/fix-13147.en.md
  27. 3 2
      deploy/packages/deb/debian/postrm
  28. 2 2
      mix.exs
  29. 1 1
      rebar.config
  30. 15 0
      scripts/pkg-tests.sh
  31. 9 1
      scripts/ui-tests/dashboard_test.py

+ 3 - 3
.github/workflows/release.yaml

@@ -8,7 +8,7 @@ on:
       tag:
         type: string
         required: true
-      publish_release_artefacts:
+      publish_release_artifacts:
         type: boolean
         required: true
         default: false
@@ -75,7 +75,7 @@ jobs:
           tag_name: "${{ env.ref_name }}"
           skip_existing: true
       - name: update to emqx.io
-        if: startsWith(env.ref_name, 'v') && ((github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts)
+        if: github.event_name == 'release' || inputs.publish_release_artifacts
         run: |
           set -eux
           curl -w %{http_code} \
@@ -86,7 +86,7 @@ jobs:
                -d "{\"repo\":\"emqx/emqx\", \"tag\": \"${{ env.ref_name }}\" }" \
                ${{ secrets.EMQX_IO_RELEASE_API }}
       - name: Push to packagecloud.io
-        if: (github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artefacts
+        if: (github.event_name == 'release' && !github.event.release.prerelease) || inputs.publish_release_artifacts
         env:
           PROFILE: ${{ steps.profile.outputs.profile }}
           VERSION: ${{ steps.profile.outputs.version }}

+ 1 - 1
apps/emqx/src/emqx.app.src

@@ -2,7 +2,7 @@
 {application, emqx, [
     {id, "emqx"},
     {description, "EMQX Core"},
-    {vsn, "5.4.0"},
+    {vsn, "5.3.1"},
     {modules, []},
     {registered, []},
     {applications, [

+ 32 - 0
apps/emqx/src/emqx_logger_jsonfmt.erl

@@ -270,6 +270,8 @@ json(L, Config) when is_list(L) ->
     end;
 json(Map, Config) when is_map(Map) ->
     best_effort_json_obj(Map, Config);
+json({'$array$', List}, Config) when is_list(List) ->
+    [json(I, Config) || I <- List];
 json(Term, Config) ->
     do_format_msg("~p", [Term], Config).
 
@@ -448,6 +450,36 @@ best_effort_json_test() ->
         <<"[\n  {\n    \"key\" : [\n      \n    ]\n  }\n]">>,
         best_effort_json([#{key => []}])
     ),
+    %% List is IO Data
+    ?assertMatch(
+        #{<<"what">> := <<"hej\n">>},
+        emqx_utils_json:decode(emqx_logger_jsonfmt:best_effort_json(#{what => [<<"hej">>, 10]}))
+    ),
+    %% Force list to be interpreted as an array
+    ?assertMatch(
+        #{<<"what">> := [<<"hej">>, 10]},
+        emqx_utils_json:decode(
+            emqx_logger_jsonfmt:best_effort_json(#{what => {'$array$', [<<"hej">>, 10]}})
+        )
+    ),
+    %% IO Data inside an array
+    ?assertMatch(
+        #{<<"what">> := [<<"hej">>, 10, <<"hej\n">>]},
+        emqx_utils_json:decode(
+            emqx_logger_jsonfmt:best_effort_json(#{
+                what => {'$array$', [<<"hej">>, 10, [<<"hej">>, 10]]}
+            })
+        )
+    ),
+    %% Array inside an array
+    ?assertMatch(
+        #{<<"what">> := [<<"hej">>, 10, [<<"hej">>, 10]]},
+        emqx_utils_json:decode(
+            emqx_logger_jsonfmt:best_effort_json(#{
+                what => {'$array$', [<<"hej">>, 10, {'$array$', [<<"hej">>, 10]}]}
+            })
+        )
+    ),
     ok.
 
 config() ->

+ 14 - 12
apps/emqx/src/emqx_schema.erl

@@ -2487,7 +2487,7 @@ converter_ciphers(Ciphers, _Opts) when is_binary(Ciphers) ->
 
 default_ciphers(Which) ->
     lists:map(
-        fun erlang:iolist_to_binary/1,
+        fun unicode:characters_to_binary/1,
         do_default_ciphers(Which)
     ).
 
@@ -2510,7 +2510,7 @@ bin_str_converter(I, _) when is_integer(I) ->
     integer_to_binary(I);
 bin_str_converter(X, _) ->
     try
-        iolist_to_binary(X)
+        unicode:characters_to_binary(X)
     catch
         _:_ ->
             throw("must_quote")
@@ -2665,7 +2665,7 @@ to_comma_separated_list(Str) ->
     {ok, string:tokens(Str, ", ")}.
 
 to_comma_separated_binary(Str) ->
-    {ok, lists:map(fun erlang:list_to_binary/1, string:tokens(Str, ", "))}.
+    {ok, lists:map(fun unicode:characters_to_binary/1, string:tokens(Str, ", "))}.
 
 to_comma_separated_atoms(Str) ->
     {ok, lists:map(fun to_atom/1, string:tokens(Str, ", "))}.
@@ -2674,7 +2674,7 @@ to_url(Str) ->
     case emqx_http_lib:uri_parse(Str) of
         {ok, URIMap} ->
             URIString = emqx_http_lib:normalize(URIMap),
-            {ok, iolist_to_binary(URIString)};
+            {ok, unicode:characters_to_binary(URIString)};
         Error ->
             Error
     end.
@@ -2682,13 +2682,13 @@ to_url(Str) ->
 to_json_binary(Str) ->
     case emqx_utils_json:safe_decode(Str) of
         {ok, _} ->
-            {ok, iolist_to_binary(Str)};
+            {ok, unicode:characters_to_binary(Str)};
         Error ->
             Error
     end.
 
 to_template(Str) ->
-    {ok, iolist_to_binary(Str)}.
+    {ok, unicode:characters_to_binary(Str, utf8)}.
 
 to_template_str(Str) ->
     {ok, unicode:characters_to_list(Str, utf8)}.
@@ -2784,7 +2784,7 @@ validate_keepalive_multiplier(_Multiplier) ->
     {error, #{reason => keepalive_multiplier_out_of_range, min => 1, max => 65535}}.
 
 validate_tcp_keepalive(Value) ->
-    case iolist_to_binary(Value) of
+    case unicode:characters_to_binary(Value) of
         <<"none">> ->
             ok;
         _ ->
@@ -2965,7 +2965,7 @@ convert_servers(undefined) ->
 convert_servers(Map) when is_map(Map) ->
     try
         List = convert_hocon_map_host_port(Map),
-        iolist_to_binary(string:join(List, ","))
+        unicode:characters_to_binary(string:join(List, ","))
     catch
         _:_ ->
             throw("bad_host_port")
@@ -2973,13 +2973,13 @@ convert_servers(Map) when is_map(Map) ->
 convert_servers([H | _] = Array) when is_binary(H) orelse is_list(H) ->
     %% if the old config was a string array
     %% we want to make sure it's converted to a comma-separated
-    iolist_to_binary([[I, ","] || I <- Array]);
+    unicode:characters_to_binary([[I, ","] || I <- Array]);
 convert_servers(Str) ->
     normalize_host_port_str(Str).
 
 %% remove spaces around comma (,)
 normalize_host_port_str(Str) ->
-    iolist_to_binary(re:replace(Str, "(\s)*,(\s)*", ",")).
+    unicode:characters_to_binary(re:replace(Str, "(\s)*,(\s)*", ",")).
 
 %% @doc Shared validation function for both 'server' and 'servers' string.
 %% NOTE: Validator is called after converter.
@@ -3458,8 +3458,10 @@ ensure_default_listener(Map, ListenerType) ->
     NewMap = Map#{<<"default">> => default_listener(ListenerType)},
     keep_default_tombstone(NewMap, #{}).
 
-cert_file(_File, client) -> undefined;
-cert_file(File, server) -> iolist_to_binary(filename:join(["${EMQX_ETC_DIR}", "certs", File])).
+cert_file(_File, client) ->
+    undefined;
+cert_file(File, server) ->
+    unicode:characters_to_binary(filename:join(["${EMQX_ETC_DIR}", "certs", File])).
 
 mqtt_converter(#{<<"keepalive_multiplier">> := Multi} = Mqtt, _Opts) ->
     case round(Multi * 100) =:= round(?DEFAULT_MULTIPLIER * 100) of

+ 16 - 3
apps/emqx/src/emqx_trace/emqx_trace_formatter.erl

@@ -89,8 +89,17 @@ weight({packet, _}) -> {0, packet};
 weight({payload, _}) -> {2, payload};
 weight({K, _}) -> {1, K}.
 
-format_packet(undefined, _) -> "";
-format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode).
+format_packet(undefined, _) ->
+    "";
+format_packet(Packet, Encode) ->
+    try
+        emqx_packet:format(Packet, Encode)
+    catch
+        _:_ ->
+            %% We don't want to crash if there is a field named packet with
+            %% some other type of value
+            Packet
+    end.
 
 format_payload(undefined, _) ->
     "";
@@ -100,7 +109,11 @@ format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) ->
     unicode:characters_to_list(Payload);
 format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> binary:encode_hex(Payload);
 format_payload(<<Part:?TRUNCATED_PAYLOAD_SIZE/binary, _/binary>> = Payload, Type) ->
-    emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type).
+    emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type);
+format_payload(Payload, _) ->
+    %% We don't want to crash if there is a field named payload with some other
+    %% type of value
+    Payload.
 
 to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom);
 to_iolist(Int) when is_integer(Int) -> integer_to_list(Int);

+ 12 - 0
apps/emqx/test/emqx_schema_tests.erl

@@ -930,3 +930,15 @@ timeout_types_test_() ->
             typerefl:from_string(emqx_schema:timeout_duration_s(), <<"4294967001ms">>)
         )
     ].
+
+unicode_template_test() ->
+    Sc = #{
+        roots => [root],
+        fields => #{root => [{template, #{type => emqx_schema:template()}}]}
+    },
+    HoconText = <<"root = {template = \"中文\"}"/utf8>>,
+    {ok, Hocon} = hocon:binary(HoconText),
+    ?assertEqual(
+        #{<<"root">> => #{<<"template">> => <<"中文"/utf8>>}},
+        hocon_tconf:check_plain(Sc, Hocon)
+    ).

+ 1 - 1
apps/emqx_auth/src/emqx_auth.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_auth, [
     {description, "EMQX Authentication and authorization"},
-    {vsn, "0.4.0"},
+    {vsn, "0.3.1"},
     {modules, []},
     {registered, [emqx_auth_sup]},
     {applications, [

+ 1 - 1
apps/emqx_bridge_azure_event_hub/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

+ 1 - 1
apps/emqx_bridge_confluent/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

+ 1 - 1
apps/emqx_bridge_kafka/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}},
+    {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
     {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
     {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
     {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

+ 48 - 27
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -214,7 +214,7 @@ ensure_client(ClientId, Hosts, ClientConfig) ->
     case wolff_client_sup:find_client(ClientId) of
         {ok, _Pid} ->
             ok;
-        {error, no_such_client} ->
+        {error, #{reason := no_such_client}} ->
             case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
                 {ok, _} ->
                     ?SLOG(info, #{
@@ -546,13 +546,13 @@ check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) ->
         {ok, Pid} ->
             ok = check_topic_status(ClientId, Pid, KafkaTopic),
             ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions);
-        {error, no_such_client} ->
+        {error, #{reason := no_such_client}} ->
             throw(#{
                 reason => cannot_find_kafka_client,
                 kafka_client => ClientId,
                 kafka_topic => KafkaTopic
             });
-        {error, client_supervisor_not_initialized} ->
+        {error, #{reason := client_supervisor_not_initialized}} ->
             throw(#{
                 reason => restarting,
                 kafka_client => ClientId,
@@ -593,33 +593,54 @@ maybe_check_health_check_topic(_) ->
     %% Cannot infer further information.  Maybe upgraded from older version.
     ?status_connected.
 
+is_alive(Pid) ->
+    is_pid(Pid) andalso erlang:is_process_alive(Pid).
+
+error_summary(Map, [Error]) ->
+    Map#{error => Error};
+error_summary(Map, [Error | More]) ->
+    Map#{first_error => Error, total_errors => length(More) + 1}.
+
 check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
-    Leaders =
-        case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
-            {ok, LeadersToCheck} ->
-                %% Kafka is considered healthy as long as any of the partition leader is reachable.
-                lists:filtermap(
-                    fun({_Partition, Pid}) ->
-                        case is_pid(Pid) andalso erlang:is_process_alive(Pid) of
-                            true -> {true, Pid};
-                            _ -> false
-                        end
-                    end,
-                    LeadersToCheck
-                );
-            {error, _} ->
-                []
-        end,
-    case Leaders of
-        [] ->
+    case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
+        {ok, Leaders} ->
+            %% Kafka is considered healthy as long as any of the partition leader is reachable.
+            case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of
+                {[], Errors} ->
+                    throw(
+                        error_summary(
+                            #{
+                                cause => "no_connected_partition_leader",
+                                kafka_client => ClientId,
+                                kafka_topic => KafkaTopic
+                            },
+                            Errors
+                        )
+                    );
+                {_, []} ->
+                    ok;
+                {_, Errors} ->
+                    ?SLOG(
+                        warning,
+                        "not_all_kafka_partitions_connected",
+                        error_summary(
+                            #{
+                                kafka_client => ClientId,
+                                kafka_topic => KafkaTopic
+                            },
+                            Errors
+                        )
+                    ),
+                    ok
+            end;
+        {error, Reason} ->
+            %% If failed to fetch metadata, wolff_client logs a warning level message
+            %% which includes the reason for each seed host
             throw(#{
-                error => no_connected_partition_leader,
+                cause => Reason,
                 kafka_client => ClientId,
-                kafka_topic => KafkaTopic,
-                partitions_limit => MaxPartitions
-            });
-        _ ->
-            ok
+                kafka_topic => KafkaTopic
+            })
     end.
 
 check_topic_status(ClientId, WolffClientPid, KafkaTopic) ->

+ 1 - 1
apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_redis, [
     {description, "EMQX Enterprise Redis Bridge"},
-    {vsn, "0.1.6"},
+    {vsn, "0.1.7"},
     {registered, []},
     {applications, [
         kernel,

+ 12 - 1
apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl

@@ -6,6 +6,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/emqx_trace.hrl").
 
 -behaviour(emqx_resource).
 
@@ -143,7 +144,13 @@ on_batch_query(
             [{ChannelID, _} | _] = BatchData,
             emqx_trace:rendered_action_template(
                 ChannelID,
-                #{commands => Cmds, batch => ture}
+                #{
+                    commands => #emqx_trace_format_func_data{
+                        function = fun trace_format_commands/1,
+                        data = Cmds
+                    },
+                    batch => true
+                }
             ),
             Result = query(InstId, {cmds, Cmds}, RedisConnSt),
             ?tp(
@@ -162,6 +169,10 @@ on_batch_query(
             Error
     end.
 
+trace_format_commands(Commands0) ->
+    Commands1 = [lists:join(" ", C) || C <- Commands0],
+    unicode:characters_to_binary(lists:join("; ", Commands1)).
+
 on_format_query_result({ok, Msg}) ->
     #{result => ok, message => Msg};
 on_format_query_result(Res) ->

+ 1 - 1
apps/emqx_license/test/emqx_license_http_api_SUITE.erl

@@ -245,7 +245,7 @@ t_license_setting_bc(_Config) ->
     ?assertMatch(#{<<"max_connections">> := 25}, request_dump()),
     %% get
     GetRes = request(get, uri(["license", "setting"]), []),
-    %% aslo check that the settings return correctly
+    %% also check that the settings return correctly
     validate_setting(GetRes, <<"75%">>, <<"80%">>, 25),
     %% update
     Low = <<"50%">>,

+ 1 - 1
apps/emqx_management/src/emqx_management.app.src

@@ -2,7 +2,7 @@
 {application, emqx_management, [
     {description, "EMQX Management API and CLI"},
     % strict semver, bump manually!
-    {vsn, "5.3.0"},
+    {vsn, "5.2.1"},
     {modules, []},
     {registered, [emqx_management_sup]},
     {applications, [

+ 1 - 1
apps/emqx_oracle/src/emqx_oracle.app.src

@@ -1,6 +1,6 @@
 {application, emqx_oracle, [
     {description, "EMQX Enterprise Oracle Database Connector"},
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {registered, []},
     {applications, [
         kernel,

+ 11 - 1
apps/emqx_oracle/src/emqx_oracle.erl

@@ -8,6 +8,7 @@
 
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/emqx_trace.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(UNHEALTHY_TARGET_MSG,
@@ -288,7 +289,7 @@ on_sql_query(InstId, ChannelID, PoolName, Type, ApplyMode, NameOrSQL, Data) ->
         type => Type,
         apply_mode => ApplyMode,
         name_or_sql => NameOrSQL,
-        data => Data
+        data => #emqx_trace_format_func_data{function = fun trace_format_data/1, data = Data}
     }),
     case ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, ApplyMode) of
         {error, Reason} = Result ->
@@ -317,6 +318,15 @@ on_sql_query(InstId, ChannelID, PoolName, Type, ApplyMode, NameOrSQL, Data) ->
             Result
     end.
 
+trace_format_data(Data0) ->
+    %% In batch request, we get a two level list
+    {'$array$', lists:map(fun insert_array_marker_if_list/1, Data0)}.
+
+insert_array_marker_if_list(List) when is_list(List) ->
+    {'$array$', List};
+insert_array_marker_if_list(Item) ->
+    Item.
+
 on_get_status(_InstId, #{pool_name := Pool} = _State) ->
     case emqx_resource_pool:health_check_workers(Pool, fun ?MODULE:do_get_status/1) of
         true ->

+ 1 - 1
apps/emqx_schema_registry/src/emqx_schema_registry.app.src

@@ -1,6 +1,6 @@
 {application, emqx_schema_registry, [
     {description, "EMQX Schema Registry"},
-    {vsn, "0.3.0"},
+    {vsn, "0.3.1"},
     {registered, [emqx_schema_registry_sup]},
     {mod, {emqx_schema_registry_app, []}},
     {included_applications, [

+ 15 - 1
apps/emqx_schema_registry/src/emqx_schema_registry_serde.erl

@@ -64,7 +64,21 @@ handle_rule_function(sparkplug_encode, [Term | MoreArgs]) ->
         [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
     );
 handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
-    decode(SchemaId, Data, MoreArgs);
+    try
+        decode(SchemaId, Data, MoreArgs)
+    catch
+        error:{gpb_error, {decoding_failure, {_Data, _Schema, {error, function_clause, _Stack}}}} ->
+            throw(
+                {schema_decode_error, #{
+                    error_type => decoding_failure,
+                    schema_id => SchemaId,
+                    data => Data,
+                    more_args => MoreArgs,
+                    explain =>
+                        <<"The given data could not be decoded. Please check the input data and the schema.">>
+                }}
+            )
+    end;
 handle_rule_function(schema_decode, Args) ->
     error({args_count_error, {schema_decode, Args}});
 handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->

+ 19 - 1
apps/emqx_schema_registry/test/emqx_schema_registry_SUITE.erl

@@ -44,7 +44,8 @@ sparkplug_tests() ->
         t_sparkplug_decode,
         t_sparkplug_encode,
         t_sparkplug_decode_encode_with_message_name,
-        t_sparkplug_encode_float_to_uint64_key
+        t_sparkplug_encode_float_to_uint64_key,
+        t_decode_fail
     ].
 
 init_per_suite(Config) ->
@@ -532,6 +533,23 @@ t_encode(Config) ->
     end,
     ok.
 
+t_decode_fail(_Config) ->
+    SerdeName = my_serde,
+    SerdeType = protobuf,
+    ok = create_serde(SerdeType, SerdeName),
+    Payload = <<"ss">>,
+    ?assertThrow(
+        {schema_decode_error, #{
+            data := <<"ss">>,
+            error_type := decoding_failure,
+            explain := _,
+            more_args := [<<"Person">>],
+            schema_id := <<"my_serde">>
+        }},
+        emqx_rule_funcs:schema_decode(<<"my_serde">>, Payload, <<"Person">>)
+    ),
+    ok.
+
 t_decode(Config) ->
     SerdeType = ?config(serde_type, Config),
     SerdeName = my_serde,

+ 1 - 0
changes/ce/fix-13140.en.md

@@ -0,0 +1 @@
+The issue causing text traces for the republish action to crash and not display correctly has been resolved.

+ 5 - 0
changes/ee/fix-13070.en.md

@@ -0,0 +1,5 @@
+Improve Kafka connector error logs.
+
+Previously, specific error details, such as unreachable advertised listeners, were not logged.
+Now, error details are captured in the logs to provide more diagnostic information.
+To manage log verbosity, only the first occurrence of an error is logged, accompanied by the total count of similar errors.

+ 6 - 0
changes/ee/fix-13079.en.md

@@ -0,0 +1,6 @@
+Improve Kafka producer error handling for `message_too_large`.
+
+Prior to this change, Kafka producers would retry sending oversized batches (`message_too_large` error) in hopes of a server side configuration fix (`max.message.bytes`).
+
+Now, oversized messages are automatically split into single-message batches for retry.
+If a message still exceeds size limits, it will be dropped to maintain data flow.

+ 1 - 0
changes/ee/fix-13130.en.md

@@ -0,0 +1 @@
+Traces for Redis action batch requests have got improved formatting. Spaces are now added between components of commands and semicolons are added between commands to make the trace message easier to read.

+ 1 - 0
changes/ee/fix-13136.en.md

@@ -0,0 +1 @@
+The template-rendered traces for Oracle actions have been enhanced for better readability.

+ 1 - 0
changes/ee/fix-13147.en.md

@@ -0,0 +1 @@
+Error messages for decoding failures in the rule engine protobuf decode functions have been improved by adding a clear descriptions to indicate what went wrong when message decoding fails.

+ 3 - 2
deploy/packages/deb/debian/postrm

@@ -21,6 +21,8 @@ set -e
 
 case "$1" in
     purge)
+        # force kill all processes owned by emqx, if any
+        pkill -9 -u emqx || true
         rm -f /etc/default/emqx
 
         if [ -d /var/lib/emqx ]; then
@@ -38,9 +40,8 @@ case "$1" in
         if [ -e /etc/init.d/emqx ]; then
                 rm  /etc/init.d/emqx
         fi
-        # Remove User & Group, killing any process owned by them
+        # Remove User & Group
         if getent passwd emqx >/dev/null; then
-                pkill -u emqx || true
                 deluser --quiet --system emqx
         fi
         if getent group emqx >/dev/null; then

+ 2 - 2
mix.exs

@@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do
       {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true},
       {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
       {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true},
-      {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true},
+      {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-5", override: true},
       {:ekka, github: "emqx/ekka", tag: "0.19.3", override: true},
       {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true},
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
@@ -210,7 +210,7 @@ defmodule EMQXUmbrella.MixProject do
       {:hstreamdb_erl,
        github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
       {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
-      {:wolff, github: "kafka4beam/wolff", tag: "1.10.3"},
+      {:wolff, github: "kafka4beam/wolff", tag: "1.10.4"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
       {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
       {:brod, github: "kafka4beam/brod", tag: "3.16.8"},

+ 1 - 1
rebar.config

@@ -82,7 +82,7 @@
     {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
     {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
-    {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}},
+    {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}},
     {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.3"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
     {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},

+ 15 - 0
scripts/pkg-tests.sh

@@ -131,6 +131,21 @@ emqx_test(){
                 exit 1
             fi
 
+            echo "try to install again and purge while the service is running"
+            dpkg -i "${PACKAGE_PATH}/${packagename}"
+            if [ "$(dpkg -l | grep ${EMQX_NAME} | awk '{print $1}')" != "ii" ]
+            then
+                echo "package install error"
+                exit 1
+            fi
+            if ! /usr/bin/emqx start
+            then
+                echo "ERROR: failed_to_start_emqx"
+                cat /var/log/emqx/erlang.log.1 || true
+                cat /var/log/emqx/emqx.log.1 || true
+                exit 1
+            fi
+            /usr/bin/emqx ping
             dpkg -P "${EMQX_NAME}"
             if dpkg -l |grep -q emqx
             then

+ 9 - 1
scripts/ui-tests/dashboard_test.py

@@ -100,7 +100,15 @@ def test_docs_link(driver, login, dashboard_url):
     driver.get(dest_url)
     ensure_current_url(driver, dest_url)
     xpath_link_help = "//div[@id='app']//div[@class='nav-header']//a[contains(@class, 'link-help')]"
-    link_help = driver.find_element(By.XPATH, xpath_link_help)
+    # retry up to 5 times
+    for _ in range(5):
+        try:
+            link_help = driver.find_element(By.XPATH, xpath_link_help)
+            break
+        except NoSuchElementException:
+            time.sleep(1)
+    else:
+        raise AssertionError("Cannot find the help link")
     driver.execute_script("arguments[0].click();", link_help)
 
     prefix, emqx_version = fetch_version(dashboard_url)