Просмотр исходного кода

Merge pull request #13662 from thalesmg/20240821-m-sync-r58

sync `release-58` to `master`
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
8131df8f6f
31 измененных файлов с 1342 добавлено и 114 удалено
  1. 1 1
      .ci/docker-compose-file/docker-compose-kdc.yaml
  2. 1 1
      .ci/docker-compose-file/docker-compose.yaml
  3. 2 2
      .github/workflows/build_packages.yaml
  4. 1 1
      .github/workflows/performance_test.yaml
  5. 1 1
      .tool-versions
  6. 19 6
      apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl
  7. 11 1
      apps/emqx/src/emqx_broker.erl
  8. 12 1
      apps/emqx/src/emqx_persistent_message.erl
  9. 9 2
      apps/emqx/src/emqx_persistent_session_ds.erl
  10. 855 12
      apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl
  11. 2 0
      apps/emqx/src/emqx_persistent_session_ds/session_internals.hrl
  12. 154 4
      apps/emqx/test/emqx_persistent_session_ds_state_tests.erl
  13. 2 1
      apps/emqx/test/emqx_proper_types.erl
  14. 2 1
      apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl
  15. 51 23
      apps/emqx_management/src/emqx_mgmt_cli.erl
  16. 28 26
      apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl
  17. 23 6
      apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl
  18. 45 4
      apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl
  19. 16 7
      apps/emqx_rule_engine/src/emqx_rule_actions.erl
  20. 13 5
      apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
  21. 1 3
      apps/emqx_rule_engine/src/emqx_rule_events.erl
  22. 6 0
      apps/emqx_rule_engine/src/emqx_rule_funcs.erl
  23. 42 1
      apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
  24. 2 1
      apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl
  25. 12 0
      apps/emqx_utils/src/emqx_variform_bif.erl
  26. 4 0
      changes/ce/feat-13516.en.md
  27. 1 1
      deploy/docker/Dockerfile
  28. 2 2
      env.sh
  29. 6 1
      mix.exs
  30. 1 0
      rebar.config.erl
  31. 17 0
      rel/i18n/emqx_rule_engine_schema.hocon

+ 1 - 1
.ci/docker-compose-file/docker-compose-kdc.yaml

@@ -3,7 +3,7 @@ version: '3.9'
 services:
   kdc:
     hostname: kdc.emqx.net
-    image:  ghcr.io/emqx/emqx-builder/5.3-9:1.15.7-26.2.5-3-ubuntu22.04
+    image:  ghcr.io/emqx/emqx-builder/5.3-11:1.15.7-26.2.5.2-1-ubuntu22.04
     container_name: kdc.emqx.net
     expose:
       - 88 # kdc

+ 1 - 1
.ci/docker-compose-file/docker-compose.yaml

@@ -4,7 +4,7 @@ services:
   erlang:
     hostname: erlang.emqx.net
     container_name: erlang
-    image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.3-9:1.15.7-26.2.5-3-ubuntu22.04}
+    image: ${DOCKER_CT_RUNNER_IMAGE:-ghcr.io/emqx/emqx-builder/5.3-11:1.15.7-26.2.5.2-1-ubuntu22.04}
     env_file:
       - credentials.env
       - conf.env

+ 2 - 2
.github/workflows/build_packages.yaml

@@ -55,7 +55,7 @@ on:
       otp_vsn:
         required: false
         type: string
-        default: '26.2.5-3'
+        default: '26.2.5.2-1'
       elixir_vsn:
         required: false
         type: string
@@ -63,7 +63,7 @@ on:
       builder_vsn:
         required: false
         type: string
-        default: '5.3-9'
+        default: '5.3-11'
 
 permissions:
   contents: read

+ 1 - 1
.github/workflows/performance_test.yaml

@@ -26,7 +26,7 @@ jobs:
   prepare:
     runs-on: ubuntu-latest
     if: github.repository_owner == 'emqx'
-    container: ghcr.io/emqx/emqx-builder/5.3-9:1.15.7-26.2.5-3-ubuntu20.04
+    container: ghcr.io/emqx/emqx-builder/5.3-11:1.15.7-26.2.5-1-ubuntu20.04
     outputs:
       BENCH_ID: ${{ steps.prepare.outputs.BENCH_ID }}
       PACKAGE_FILE: ${{ steps.package_file.outputs.PACKAGE_FILE }}

+ 1 - 1
.tool-versions

@@ -1,2 +1,2 @@
-erlang 26.2.5-3
+erlang 26.2.5.2-1
 elixir 1.15.7-otp-26

+ 19 - 6
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -14,6 +14,8 @@
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
+-define(DURABLE_SESSION_STATE, emqx_persistent_session).
+
 %%------------------------------------------------------------------------------
 %% CT boilerplate
 %%------------------------------------------------------------------------------
@@ -206,6 +208,14 @@ force_last_alive_at(ClientId, Time) ->
     _ = emqx_persistent_session_ds_state:commit(S),
     ok.
 
+stop_and_commit(Client) ->
+    {ok, {ok, _}} =
+        ?wait_async_action(
+            emqtt:stop(Client),
+            #{?snk_kind := persistent_session_ds_terminate}
+        ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
@@ -327,7 +337,7 @@ t_session_unsubscription_idempotency(Config) ->
                     15_000
                 ),
 
-            ok = emqtt:stop(Client1),
+            ok = stop_and_commit(Client1),
 
             ok
         end,
@@ -659,11 +669,14 @@ t_session_replay_retry(_Config) ->
 
     %% Make `emqx_ds` believe that roughly half of the shards are unavailable.
     ok = emqx_ds_test_helpers:mock_rpc_result(
-        fun(_Node, emqx_ds_replication_layer, _Function, [_DB, Shard | _]) ->
-            case erlang:phash2(Shard) rem 2 of
-                0 -> unavailable;
-                1 -> passthrough
-            end
+        fun
+            (_Node, emqx_ds_replication_layer, _Function, [?DURABLE_SESSION_STATE, _Shard | _]) ->
+                passthrough;
+            (_Node, emqx_ds_replication_layer, _Function, [_DB, Shard | _]) ->
+                case erlang:phash2(Shard) rem 2 of
+                    0 -> unavailable;
+                    1 -> passthrough
+                end
         end
     ),
 

+ 11 - 1
apps/emqx/src/emqx_broker.erl

@@ -97,7 +97,9 @@
 -type publish_opts() :: #{
     %% Whether to return a disinguishing value `{blocked, #message{}}' when a hook from
     %% `'message.publish''` returns `allow_publish => false'.  Defaults to `false'.
-    hook_prohibition_as_error => boolean()
+    hook_prohibition_as_error => boolean(),
+    %% do not call message.publish hook point if true
+    bypass_hook => boolean()
 }.
 
 -spec start_link(atom(), pos_integer()) -> startlink_ret().
@@ -243,6 +245,14 @@ publish(#message{} = Msg) ->
 publish(#message{} = Msg, Opts) ->
     _ = emqx_trace:publish(Msg),
     emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
+    case maps:get(bypass_hook, Opts, false) of
+        true ->
+            do_publish(Msg);
+        false ->
+            eval_hook_and_publish(Msg, Opts)
+    end.
+
+eval_hook_and_publish(Msg, Opts) ->
     case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
         #message{headers = #{should_disconnect := true}, topic = Topic} ->
             ?TRACE("MQTT", "msg_publish_not_allowed_disconnect", #{

+ 12 - 1
apps/emqx/src/emqx_persistent_message.erl

@@ -47,7 +47,7 @@ init() ->
         ?SLOG(notice, #{msg => "Session durability is enabled"}),
         ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, get_db_config()),
         ok = emqx_persistent_session_ds_router:init_tables(),
-        ok = emqx_persistent_session_ds:create_tables(),
+        ok = initialize_session_ds_state(),
         ok
     end).
 
@@ -69,6 +69,17 @@ get_db_config() ->
 force_ds(Zone) ->
     emqx_config:get_zone_conf(Zone, [durable_sessions, force_persistence]).
 
+-ifdef(STORE_STATE_IN_DS).
+initialize_session_ds_state() ->
+    Config = emqx_ds_schema:db_config([durable_storage, sessions]),
+    ok = emqx_persistent_session_ds_state:open_db(Config).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
+initialize_session_ds_state() ->
+    ok = emqx_persistent_session_ds_state:create_tables().
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
+
 %%--------------------------------------------------------------------
 
 -spec add_handler() -> ok.

+ 9 - 2
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -82,7 +82,11 @@
 ]).
 
 %% session table operations
--export([create_tables/0, sync/1]).
+-export([sync/1]).
+-ifndef(STORE_STATE_IN_DS).
+-export([create_tables/0]).
+%% END ifndef(STORE_STATE_IN_DS).
+-endif.
 
 %% internal export used by session GC process
 -export([destroy_session/1]).
@@ -832,8 +836,11 @@ get_client_subscription(ClientId, TopicFilter) ->
 %% Session tables operations
 %%--------------------------------------------------------------------
 
+-ifndef(STORE_STATE_IN_DS).
 create_tables() ->
     emqx_persistent_session_ds_state:create_tables().
+%% END ifndef(STORE_STATE_IN_DS).
+-endif.
 
 %% @doc Force syncing of the transient state to persistent storage
 sync(ClientId) ->
@@ -939,7 +946,7 @@ session_ensure_new(
     S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4),
     S6 = set_clientinfo(ClientInfo, S5),
     S7 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S6),
-    S = emqx_persistent_session_ds_state:commit(S7),
+    S = emqx_persistent_session_ds_state:commit(S7, #{ensure_new => true}),
     #{
         id => Id,
         props => Conf,

Разница между файлами не показана из-за своего большого размера
+ 855 - 12
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl


+ 2 - 0
apps/emqx/src/emqx_persistent_session_ds/session_internals.hrl

@@ -19,6 +19,8 @@
 -include("emqx_persistent_message.hrl").
 -include("emqx_durable_session_metadata.hrl").
 
+-define(DURABLE_SESSION_STATE, emqx_persistent_session).
+
 -define(SESSION_TAB, emqx_ds_session).
 -define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions).
 -define(SESSION_STREAM_TAB, emqx_ds_stream_tab).

+ 154 - 4
apps/emqx/test/emqx_persistent_session_ds_state_tests.erl

@@ -21,12 +21,13 @@
 -include_lib("proper/include/proper.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
--define(tab, ?MODULE).
-
 %%================================================================================
 %% Type declarations
 %%================================================================================
 
+-define(tab, ?MODULE).
+-define(DB, emqx_persistent_session).
+
 %% Note: here `committed' != `dirty'. It means "has been committed at
 %% least once since the creation", and it's used by the iteration
 %% test.
@@ -34,6 +35,15 @@
 
 -type state() :: #{emqx_persistent_session_ds:id() => #s{}}.
 
+-define(metadata_domain, metadata).
+-define(metadata_domain_bin, <<"metadata">>).
+-define(subscription_domain, subscription).
+-define(subscription_state_domain, subscription_state).
+-define(stream_domain, stream).
+-define(rank_domain, rank).
+-define(seqno_domain, seqno).
+-define(awaiting_rel_domain, awaiting_rel).
+
 %%================================================================================
 %% Properties
 %%================================================================================
@@ -62,6 +72,51 @@ prop_consistency() ->
         end
     ).
 
+-ifdef(STORE_STATE_IN_DS).
+%% Verifies that our internal keys generated for stream keys preserve the order relation
+%% between them.
+stream_order_internal_keys_proper_test_() ->
+    Props = [prop_stream_order_internal_keys()],
+    Opts = [{numtests, 100}, {to_file, user}, {max_size, 100}],
+    {timeout, 300, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.
+
+prop_stream_order_internal_keys() ->
+    ?FORALL(
+        {Id, Streams0},
+        {session_id(), list({non_neg_integer(), value_gen(), stream_state()})},
+        try
+            init(),
+            Streams = lists:uniq(Streams0),
+            StreamKeys = [{R, S} || {R, S, _SS} <- Streams],
+            ExpectedRanks = lists:sort([R || {R, _S, _SS} <- Streams]),
+            S = lists:foldl(
+                fun({R, S, SS}, Acc) ->
+                    emqx_persistent_session_ds_state:put_stream({R, S}, SS, Acc)
+                end,
+                emqx_persistent_session_ds_state:create_new(Id),
+                Streams
+            ),
+            RevRanks = emqx_persistent_session_ds_state:fold_streams(
+                fun({R, _S}, _SS, Acc) -> [R | Acc] end,
+                [],
+                S
+            ),
+            Ranks = lists:reverse(RevRanks),
+            ?WHENFAIL(
+                io:format(
+                    user,
+                    "Expected ranks:\n  ~p\nRanks:\n  ~p\nStream keys:\n  ~p\n",
+                    [ExpectedRanks, Ranks, StreamKeys]
+                ),
+                ExpectedRanks =:= Ranks
+            )
+        after
+            clean()
+        end
+    ).
+%% -ifdef(STORE_STATE_IN_DS).
+-endif.
+
 %%================================================================================
 %% Generators
 %%================================================================================
@@ -109,17 +164,26 @@ seqno_track() ->
 seqno() ->
     range(1, 100).
 
+-ifdef(STORE_STATE_IN_DS).
+stream_id() ->
+    {range(1, 3), oneof([#{}, {}])}.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 stream_id() ->
+    %% Note: this does not match the stream id type used in practice, which is a
+    %% `{emqx_persistent_session_ds:subscription_id(), emqx_ds:stream()}'
     range(1, 1).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
-stream() ->
+stream_state() ->
     oneof([#{}]).
 
 put_req() ->
     oneof([
         ?LET(
             {Id, Stream},
-            {stream_id(), stream()},
+            {stream_id(), stream_state()},
             {#s.streams, put_stream, Id, Stream}
         ),
         ?LET(
@@ -147,6 +211,47 @@ del_req() ->
         {#s.subs, del_subscription, topic()}
     ]).
 
+value_gen() ->
+    oneof([#{}, loose_tuple(oneof([range(1, 3), binary()]))]).
+
+session_id_gen() ->
+    frequency([
+        {5, clientid()},
+        {1, <<"a/">>},
+        {1, <<"a/b">>},
+        {1, <<"a/+">>},
+        {1, <<"a/+/#">>},
+        {1, <<"#">>},
+        {1, <<"+">>},
+        {1, <<"/">>}
+    ]).
+
+clientid() ->
+    %% empty string is not valid...
+    ?SUCHTHAT(ClientId, emqx_proper_types:clientid(), ClientId =/= <<>>).
+
+domain_gen() ->
+    oneof([
+        ?metadata_domain,
+        ?subscription_domain,
+        ?subscription_state_domain,
+        ?stream_domain,
+        ?rank_domain,
+        ?seqno_domain,
+        ?awaiting_rel_domain
+    ]).
+
+key_gen(?metadata_domain) ->
+    <<"metadata">>;
+key_gen(?stream_domain) ->
+    ?LET(
+        {Rank, X},
+        {integer(), integer()},
+        <<Rank:64, X:64>>
+    );
+key_gen(_) ->
+    integer().
+
 command(S) ->
     case maps:size(S) > 0 of
         true ->
@@ -316,12 +421,57 @@ get_state(SessionId) ->
 put_state(SessionId, S) ->
     ets:insert(?tab, {SessionId, S}).
 
+-ifdef(STORE_STATE_IN_DS).
+init() ->
+    _ = ets:new(?tab, [named_table, public, {keypos, 1}]),
+    mria:start(),
+    {ok, _} = application:ensure_all_started(emqx_ds_backends),
+    Dir = binary_to_list(filename:join(["/tmp", emqx_guid:to_hexstr(emqx_guid:gen())])),
+    persistent_term:put({?MODULE, data_dir}, Dir),
+    application:set_env(emqx_durable_storage, db_data_dir, Dir),
+    Defaults = #{
+        backend => builtin_local,
+        force_monotonic_timestamps => false,
+        atomic_batches => true,
+        storage =>
+            {emqx_ds_storage_bitfield_lts, #{
+                topic_index_bytes => 4,
+                epoch_bits => 10,
+                bits_per_topic_level => 64
+            }},
+        n_shards => 16,
+        n_sites => 1,
+        replication_factor => 3,
+        replication_options => #{}
+    },
+    ok = emqx_persistent_session_ds_state:open_db(Defaults),
+    ok.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 init() ->
     _ = ets:new(?tab, [named_table, public, {keypos, 1}]),
     mria:start(),
     emqx_persistent_session_ds_state:create_tables().
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
+-ifdef(STORE_STATE_IN_DS).
+clean() ->
+    ets:delete(?tab),
+    emqx_ds:drop_db(?DB),
+    application:stop(emqx_ds_backends),
+    application:stop(emqx_ds_builtin_local),
+    mria:stop(),
+    mria_mnesia:delete_schema(),
+    Dir = persistent_term:get({?MODULE, data_dir}),
+    persistent_term:erase({?MODULE, data_dir}),
+    ok = file:del_dir_r(Dir),
+    ok.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 clean() ->
     ets:delete(?tab),
     mria:stop(),
     mria_mnesia:delete_schema().
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.

+ 2 - 1
apps/emqx/test/emqx_proper_types.erl

@@ -50,7 +50,8 @@
     printable_utf8/0,
     printable_codepoint/0,
     raw_duration/0,
-    large_raw_duration/0
+    large_raw_duration/0,
+    clientid/0
 ]).
 
 %% Generic Types

+ 2 - 1
apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl

@@ -173,7 +173,8 @@ t_source(Config) ->
                         payload => <<"${payload}">>,
                         qos => 0,
                         retain => false,
-                        user_properties => []
+                        user_properties => [],
+                        direct_dispatch => false
                     },
                     function => republish
                 }

+ 51 - 23
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -16,6 +16,8 @@
 
 -module(emqx_mgmt_cli).
 
+-feature(maybe_expr, enable).
+
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_cm.hrl").
 -include_lib("emqx/include/emqx_router.hrl").
@@ -750,28 +752,54 @@ listeners(["restart", ListenerId]) ->
             emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId])
     end;
 listeners(["enable", ListenerId, Enable0]) ->
-    Enable = Enable0 =:= "true",
-    Action =
-        case Enable of
-            true ->
-                start;
-            _ ->
-                stop
-        end,
-    case emqx_listeners:parse_listener_id(ListenerId) of
-        {ok, #{type := Type, name := Name}} ->
-            RawConf = emqx_mgmt_listeners_conf:get_raw(Type, Name),
-            Conf = RawConf#{<<"enable">> := Enable},
-            case emqx_mgmt_listeners_conf:action(Type, Name, Action, Conf) of
-                {ok, _} ->
-                    emqx_ctl:print("Updated 'enable' to: '~0p' successfully.~n", [Enable]);
-                {error, Reason} ->
-                    emqx_ctl:print("Update listener: ~0p failed, Reason: ~0p~n", [
-                        ListenerId, Reason
-                    ])
-            end;
-        _ ->
-            emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId])
+    maybe
+        {ok, Enable, Action} ?=
+            case Enable0 of
+                "true" ->
+                    {ok, true, start};
+                "false" ->
+                    {ok, false, stop};
+                _ ->
+                    {error, badarg}
+            end,
+        {ok, #{type := Type, name := Name}} ?= emqx_listeners:parse_listener_id(ListenerId),
+        #{<<"enable">> := OldEnable} ?= RawConf = emqx_conf:get_raw(
+            [listeners, Type, Name], {error, nout_found}
+        ),
+        {ok, AtomId} = emqx_utils:safe_to_existing_atom(ListenerId),
+        ok ?=
+            case Enable of
+                OldEnable ->
+                    %% `enable` and `running` may lose synchronization due to the start/stop commands
+                    case Action of
+                        start ->
+                            emqx_listeners:start_listener(AtomId);
+                        stop ->
+                            emqx_listeners:stop_listener(AtomId)
+                    end;
+                _ ->
+                    Conf = RawConf#{<<"enable">> := Enable},
+                    case emqx_mgmt_listeners_conf:action(Type, Name, Action, Conf) of
+                        {ok, _} ->
+                            ok;
+                        Error ->
+                            Error
+                    end
+            end,
+        emqx_ctl:print("Updated 'enable' to: '~0p' successfully.~n", [Enable])
+    else
+        {error, badarg} ->
+            emqx_ctl:print("Invalid bool argument: ~0p~n", [Enable0]);
+        {error, {invalid_listener_id, _Id}} ->
+            emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId]);
+        {error, not_found} ->
+            emqx_ctl:print("Not found listener: ~0p~n", [ListenerId]);
+        {error, {already_started, _Pid}} ->
+            emqx_ctl:print("Updated 'enable' to: '~0p' successfully.~n", [Enable0]);
+        {error, Reason} ->
+            emqx_ctl:print("Update listener: ~0p failed, Reason: ~0p~n", [
+                ListenerId, Reason
+            ])
     end;
 listeners(_) ->
     emqx_ctl:usage([
@@ -779,7 +807,7 @@ listeners(_) ->
         {"listeners stop    <Identifier>", "Stop a listener"},
         {"listeners start   <Identifier>", "Start a listener"},
         {"listeners restart <Identifier>", "Restart a listener"},
-        {"listeners enable <Identifier> <Bool>", "Enable or disable a listener"}
+        {"listeners enable <Identifier> <true/false>", "Enable or disable a listener"}
     ]).
 
 %%--------------------------------------------------------------------

+ 28 - 26
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -104,27 +104,32 @@ init_per_group(general, Config) ->
         | Config
     ];
 init_per_group(persistent_sessions, Config) ->
-    AppSpecs = [
-        {emqx,
-            "durable_sessions.enable = true\n"
-            "durable_sessions.disconnected_session_count_refresh_interval = 100ms"},
-        emqx_management
-    ],
-    Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(),
-    Cluster = [
-        {emqx_mgmt_api_clients_SUITE1, #{apps => AppSpecs ++ [Dashboard]}},
-        {emqx_mgmt_api_clients_SUITE2, #{apps => AppSpecs}}
-    ],
-    Nodes =
-        [N1 | _] = emqx_cth_cluster:start(
-            Cluster,
-            #{work_dir => emqx_cth_suite:work_dir(Config)}
-        ),
-    [
-        {nodes, Nodes},
-        {api_auth_header, erpc:call(N1, emqx_mgmt_api_test_util, auth_header_, [])}
-        | Config
-    ];
+    case emqx_ds_test_helpers:skip_if_norepl() of
+        false ->
+            AppSpecs = [
+                {emqx,
+                    "durable_sessions.enable = true\n"
+                    "durable_sessions.disconnected_session_count_refresh_interval = 100ms"},
+                emqx_management
+            ],
+            Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(),
+            Cluster = [
+                {emqx_mgmt_api_clients_SUITE1, #{apps => AppSpecs ++ [Dashboard]}},
+                {emqx_mgmt_api_clients_SUITE2, #{apps => AppSpecs}}
+            ],
+            Nodes =
+                [N1 | _] = emqx_cth_cluster:start(
+                    Cluster,
+                    #{work_dir => emqx_cth_suite:work_dir(Config)}
+                ),
+            [
+                {nodes, Nodes},
+                {api_auth_header, erpc:call(N1, emqx_mgmt_api_test_util, auth_header_, [])}
+                | Config
+            ];
+        Yes ->
+            Yes
+    end;
 init_per_group(non_persistent_cluster, Config) ->
     AppSpecs = [
         emqx,
@@ -327,11 +332,8 @@ t_persistent_sessions1(Config) ->
             C2 = connect_client(#{port => Port1, clientid => ClientId}),
             assert_single_client(#{node => N1, clientid => ClientId, status => connected}, Config),
             %% 4) Client disconnects.
-            ok = emqtt:stop(C2),
             %% 5) Session is GC'ed, client is removed from list.
-            ?tp(notice, "gc", #{}),
-            %% simulate GC
-            ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
+            disconnect_and_destroy_session(C2),
             ?retry(
                 100,
                 20,
@@ -511,7 +513,7 @@ t_persistent_sessions5(Config) ->
                 list_request(#{limit => 2, page => 1}, Config)
             ),
             %% Disconnect persistent sessions
-            lists:foreach(fun emqtt:stop/1, [C1, C2]),
+            lists:foreach(fun stop_and_commit/1, [C1, C2]),
 
             P3 =
                 ?retry(200, 10, begin

+ 23 - 6
apps/emqx_message_transformation/src/emqx_message_transformation_http_api.erl

@@ -39,7 +39,8 @@
 -define(METRIC_NAME, message_transformation).
 
 -type user_property() :: #{binary() => binary()}.
--reflect_type([user_property/0]).
+-type publish_properties() :: #{binary() => binary() | integer()}.
+-reflect_type([user_property/0, publish_properties/0]).
 
 %%-------------------------------------------------------------------------------------------------
 %% `minirest' and `minirest_trails' API
@@ -305,7 +306,14 @@ fields(dryrun_input_message) ->
     %% See `emqx_message_transformation:eval_context()'.
     [
         {client_attrs, mk(map(), #{default => #{}})},
+        {clientid, mk(binary(), #{default => <<"test-clientid">>})},
         {payload, mk(binary(), #{required => true})},
+        {peername, mk(emqx_schema:ip_port(), #{default => <<"127.0.0.1:19872">>})},
+        {pub_props,
+            mk(
+                typerefl:alias("map()", publish_properties()),
+                #{default => #{}}
+            )},
         {qos, mk(range(0, 2), #{default => 0})},
         {retain, mk(boolean(), #{default => false})},
         {topic, mk(binary(), #{required => true})},
@@ -313,7 +321,8 @@ fields(dryrun_input_message) ->
             mk(
                 typerefl:alias("map(binary(), binary())", user_property()),
                 #{default => #{}}
-            )}
+            )},
+        {username, mk(binary(), #{required => false})}
     ];
 fields(get_metrics) ->
     [
@@ -728,26 +737,34 @@ dryrun_input_message_in(Params) ->
         ),
     #{
         client_attrs := ClientAttrs,
+        clientid := ClientId,
         payload := Payload,
+        peername := Peername,
+        pub_props := PublishProperties,
         qos := QoS,
         retain := Retain,
         topic := Topic,
         user_property := UserProperty0
     } = Message0,
+    Username = maps:get(username, Message0, undefined),
     UserProperty = maps:to_list(UserProperty0),
     Message1 = #{
         id => emqx_guid:gen(),
         timestamp => emqx_message:timestamp_now(),
         extra => #{},
-        from => <<"test-clientid">>,
-
-        flags => #{retain => Retain},
+        from => ClientId,
+        flags => #{dup => false, retain => Retain},
         qos => QoS,
         topic => Topic,
         payload => Payload,
         headers => #{
             client_attrs => ClientAttrs,
-            properties => #{'User-Property' => UserProperty}
+            peername => Peername,
+            properties => maps:merge(
+                PublishProperties,
+                #{'User-Property' => UserProperty}
+            ),
+            username => Username
         }
     },
     Message = emqx_message:from_map(Message1),

+ 45 - 4
apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl

@@ -1875,12 +1875,26 @@ t_dryrun_transformation(_Config) ->
                 operation(retain, <<"payload.r">>),
                 operation(<<"user_property.a">>, <<"payload.u.a">>),
                 operation(<<"user_property.copy">>, <<"user_property.original">>),
-                operation(<<"payload">>, <<"payload.p.hello">>)
+                operation(<<"payload.user">>, <<"username">>),
+                operation(<<"payload.flags">>, <<"flags">>),
+                operation(<<"payload.pprops">>, <<"pub_props">>),
+                operation(<<"payload.expiry">>, <<"pub_props.Message-Expiry-Interval">>),
+                operation(<<"payload.peername">>, <<"peername">>),
+                operation(<<"payload.node">>, <<"node">>),
+                operation(<<"payload.id">>, <<"id">>),
+                operation(<<"payload.clientid">>, <<"clientid">>),
+                operation(<<"payload.now">>, <<"timestamp">>),
+                operation(<<"payload.recv_at">>, <<"publish_received_at">>),
+                operation(<<"payload.hi">>, <<"payload.p.hello">>)
             ],
             Transformation1 = transformation(Name1, Operations),
 
             %% Good input
+            ClientId = <<"myclientid">>,
+            Username = <<"myusername">>,
+            Peername = <<"10.0.50.1:63221">>,
             Message1 = dryrun_input_message(#{
+                clientid => ClientId,
                 payload => #{
                     p => #{<<"hello">> => <<"world">>},
                     q => 1,
@@ -1888,11 +1902,15 @@ t_dryrun_transformation(_Config) ->
                     t => <<"t">>,
                     u => #{a => <<"b">>}
                 },
-                user_property => #{<<"original">> => <<"user_prop">>}
+                peername => Peername,
+                pub_props => #{<<"Message-Expiry-Interval">> => 30},
+                user_property => #{<<"original">> => <<"user_prop">>},
+                username => Username
             }),
+            Res1 = dryrun_transformation(Transformation1, Message1),
             ?assertMatch(
                 {200, #{
-                    <<"payload">> := <<"\"world\"">>,
+                    <<"payload">> := _,
                     <<"qos">> := 1,
                     <<"retain">> := true,
                     <<"topic">> := <<"t/u/v/t">>,
@@ -1902,7 +1920,30 @@ t_dryrun_transformation(_Config) ->
                         <<"copy">> := <<"user_prop">>
                     }
                 }},
-                dryrun_transformation(Transformation1, Message1)
+                Res1
+            ),
+            {200, #{<<"payload">> := EncPayloadRes1}} = Res1,
+            NodeBin = atom_to_binary(node()),
+            ?assertMatch(
+                #{
+                    <<"hi">> := <<"world">>,
+                    <<"now">> := _,
+                    <<"recv_at">> := _,
+                    <<"clientid">> := ClientId,
+                    <<"pprops">> := #{
+                        <<"Message-Expiry-Interval">> := 30,
+                        <<"User-Property">> := #{
+                            <<"original">> := <<"user_prop">>
+                        }
+                    },
+                    <<"expiry">> := 30,
+                    <<"peername">> := Peername,
+                    <<"node">> := NodeBin,
+                    <<"id">> := <<_/binary>>,
+                    <<"flags">> := #{<<"dup">> := false, <<"retain">> := true},
+                    <<"user">> := Username
+                },
+                emqx_utils_json:decode(EncPayloadRes1, [return_maps])
             ),
 
             %% Bad input: fails to decode

+ 16 - 7
apps/emqx_rule_engine/src/emqx_rule_actions.erl

@@ -83,7 +83,8 @@ pre_process_action_args(
         retain := Retain,
         payload := Payload,
         mqtt_properties := MQTTProperties,
-        user_properties := UserProperties
+        user_properties := UserProperties,
+        direct_dispatch := DirectDispatch
     } = Args
 ) ->
     Args#{
@@ -93,7 +94,8 @@ pre_process_action_args(
             retain => parse_simple_var(Retain),
             payload => parse_payload(Payload),
             mqtt_properties => parse_mqtt_properties(MQTTProperties),
-            user_properties => parse_user_properties(UserProperties)
+            user_properties => parse_user_properties(UserProperties),
+            direct_dispatch => parse_simple_var(DirectDispatch)
         }
     };
 pre_process_action_args(_, Args) ->
@@ -153,7 +155,8 @@ republish(
             topic := TopicTemplate,
             payload := PayloadTemplate,
             mqtt_properties := MQTTPropertiesTemplate,
-            user_properties := UserPropertiesTemplate
+            user_properties := UserPropertiesTemplate,
+            direct_dispatch := DirectDispatchTemplate
         }
     }
 ) ->
@@ -164,6 +167,7 @@ republish(
     Payload = iolist_to_binary(PayloadString),
     QoS = render_simple_var(QoSTemplate, Selected, 0),
     Retain = render_simple_var(RetainTemplate, Selected, false),
+    DirectDispatch = render_simple_var(DirectDispatchTemplate, Selected, false),
     %% 'flags' is set for message re-publishes or message related
     %% events such as message.acked and message.dropped
     Flags0 = maps:get(flags, Env, #{}),
@@ -175,7 +179,8 @@ republish(
         flags => Flags,
         topic => Topic,
         payload => Payload,
-        pub_props => PubProps
+        pub_props => PubProps,
+        direct_dispatch => DirectDispatch
     },
     case logger:get_process_metadata() of
         #{action_id := ActionID} ->
@@ -190,7 +195,7 @@ republish(
         "republish_message",
         TraceInfo
     ),
-    safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps).
+    safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps, DirectDispatch).
 
 %%--------------------------------------------------------------------
 %% internal functions
@@ -232,7 +237,7 @@ pre_process_args(Mod, Func, Args) ->
         false -> Args
     end.
 
-safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
+safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps, DirectDispatch) ->
     Msg = #message{
         id = emqx_guid:gen(),
         qos = QoS,
@@ -246,7 +251,11 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
         payload = Payload,
         timestamp = erlang:system_time(millisecond)
     },
-    case emqx_broker:safe_publish(Msg, #{hook_prohibition_as_error => true}) of
+    case
+        emqx_broker:safe_publish(Msg, #{
+            bypass_hook => DirectDispatch, hook_prohibition_as_error => true
+        })
+    of
         Routes when is_list(Routes) ->
             emqx_metrics:inc_msg(Msg),
             ok;

+ 13 - 5
apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl

@@ -144,7 +144,7 @@ fields("republish_args") ->
     [
         {topic,
             ?HOCON(
-                binary(),
+                emqx_schema:template(),
                 #{
                     desc => ?DESC("republish_args_topic"),
                     required => true,
@@ -162,7 +162,7 @@ fields("republish_args") ->
             )},
         {retain,
             ?HOCON(
-                hoconsc:union([boolean(), binary()]),
+                hoconsc:union([boolean(), emqx_schema:template()]),
                 #{
                     desc => ?DESC("republish_args_retain"),
                     default => <<"${retain}">>,
@@ -171,7 +171,7 @@ fields("republish_args") ->
             )},
         {payload,
             ?HOCON(
-                binary(),
+                emqx_schema:template(),
                 #{
                     desc => ?DESC("republish_args_payload"),
                     default => <<"${payload}">>,
@@ -188,12 +188,20 @@ fields("republish_args") ->
             )},
         {user_properties,
             ?HOCON(
-                binary(),
+                emqx_schema:template(),
                 #{
                     desc => ?DESC("republish_args_user_properties"),
                     default => <<"${user_properties}">>,
                     example => <<"${pub_props.'User-Property'}">>
                 }
+            )},
+        {direct_dispatch,
+            ?HOCON(
+                hoconsc:union([boolean(), emqx_schema:template()]),
+                #{
+                    desc => ?DESC("republish_args_direct_dispatch"),
+                    default => false
+                }
             )}
     ];
 fields("republish_mqtt_properties") ->
@@ -263,7 +271,7 @@ actions() ->
     end.
 
 qos() ->
-    hoconsc:union([emqx_schema:qos(), binary()]).
+    hoconsc:union([emqx_schema:qos(), emqx_schema:template()]).
 
 rule_engine_settings() ->
     [

+ 1 - 3
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -1289,9 +1289,7 @@ columns_example_props_specific(unsub_props) ->
 
 columns_example_client_attrs() ->
     {<<"client_attrs">>, #{
-        <<"client_attrs">> => #{
-            <<"test">> => <<"example">>
-        }
+        <<"test">> => <<"example">>
     }}.
 
 %%--------------------------------------------------------------------

+ 6 - 0
apps/emqx_rule_engine/src/emqx_rule_funcs.erl

@@ -140,6 +140,8 @@
     ltrim/1,
     reverse/1,
     rtrim/1,
+    rtrim/2,
+    rm_prefix/2,
     strlen/1,
     substr/2,
     substr/3,
@@ -795,6 +797,10 @@ reverse(S) -> emqx_variform_bif:reverse(S).
 
 rtrim(S) -> emqx_variform_bif:rtrim(S).
 
+rtrim(S, Chars) -> emqx_variform_bif:rtrim(S, Chars).
+
+rm_prefix(S, Prefix) -> emqx_variform_bif:rm_prefix(S, Prefix).
+
 strlen(S) -> emqx_variform_bif:strlen(S).
 
 substr(S, Start) -> emqx_variform_bif:substr(S, Start).

+ 42 - 1
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -81,6 +81,7 @@ groups() ->
             t_sqlselect_1,
             t_sqlselect_2,
             t_sqlselect_3,
+            t_direct_dispatch,
             t_sqlselect_message_publish_event_keep_original_props_1,
             t_sqlselect_message_publish_event_keep_original_props_2,
             t_sqlselect_missing_template_vars_render_as_undefined,
@@ -1996,6 +1997,42 @@ t_sqlselect_3(_Config) ->
     emqtt:stop(Client),
     delete_rule(TopicRule).
 
+%% select from t/1, republish to t/1, no dead-loop expected
+%% i.e. payload is mutated once and only once
+t_direct_dispatch(_Config) ->
+    SQL = "SELECT * FROM \"t/1\"",
+    Repub = republish_action(
+        <<"t/1">>,
+        <<"republished: ${payload}">>,
+        <<"${user_properties}">>,
+        #{},
+        true
+    ),
+    {ok, Rule} = emqx_rule_engine:create_rule(
+        #{
+            sql => SQL,
+            id => ?TMP_RULEID,
+            actions => [Repub]
+        }
+    ),
+    {ok, Pub} = emqtt:start_link([{clientid, <<"pubclient">>}]),
+    {ok, _} = emqtt:connect(Pub),
+    {ok, Sub} = emqtt:start_link([{clientid, <<"subclient">>}]),
+    {ok, _} = emqtt:connect(Sub),
+    {ok, _, _} = emqtt:subscribe(Sub, <<"t/1">>, 0),
+    Payload = base64:encode(crypto:strong_rand_bytes(12)),
+    emqtt:publish(Pub, <<"t/1">>, Payload, 0),
+    receive
+        {publish, #{topic := T, payload := Payload1}} ->
+            ?assertEqual(<<"t/1">>, T),
+            ?assertEqual(<<"republished: ", Payload/binary>>, Payload1)
+    after 2000 ->
+        ct:fail(wait_for_t2)
+    end,
+    emqtt:stop(Pub),
+    emqtt:stop(Sub),
+    delete_rule(Rule).
+
 t_sqlselect_message_publish_event_keep_original_props_1(_Config) ->
     %% republish the client.connected msg
     Topic = <<"foo/bar/1">>,
@@ -3960,6 +3997,9 @@ republish_action(Topic, Payload, UserProperties) ->
     republish_action(Topic, Payload, UserProperties, _MQTTProperties = #{}).
 
 republish_action(Topic, Payload, UserProperties, MQTTProperties) ->
+    republish_action(Topic, Payload, UserProperties, MQTTProperties, false).
+
+republish_action(Topic, Payload, UserProperties, MQTTProperties, DirectDispatch) ->
     #{
         function => republish,
         args => #{
@@ -3968,7 +4008,8 @@ republish_action(Topic, Payload, UserProperties, MQTTProperties) ->
             qos => 0,
             retain => false,
             mqtt_properties => MQTTProperties,
-            user_properties => UserProperties
+            user_properties => UserProperties,
+            direct_dispatch => DirectDispatch
         }
     }.
 

+ 2 - 1
apps/emqx_schema_validation/test/emqx_schema_validation_http_api_SUITE.erl

@@ -1499,7 +1499,8 @@ t_republish_action_failure(_Config) ->
                                 <<"qos">> => 0,
                                 <<"retain">> => false,
                                 <<"topic">> => <<"t/republished">>,
-                                <<"user_properties">> => <<>>
+                                <<"user_properties">> => <<>>,
+                                <<"direct_dispatch">> => false
                             }
                     }
                 ]

+ 12 - 0
apps/emqx_utils/src/emqx_variform_bif.erl

@@ -25,6 +25,7 @@
     reverse/1,
     rtrim/1,
     rtrim/2,
+    rm_prefix/2,
     strlen/1,
     substr/2,
     substr/3,
@@ -107,6 +108,17 @@ rtrim(S) when is_binary(S) ->
 rtrim(S, Chars) when is_binary(S) ->
     string:trim(S, trailing, Chars).
 
+%% @doc Remove the prefix of a string if there is a match.
+%% The original stirng is returned if there is no match.
+rm_prefix(S, Prefix) ->
+    Size = size(Prefix),
+    case S of
+        <<P:Size/binary, Rem/binary>> when P =:= Prefix ->
+            Rem;
+        _ ->
+            S
+    end.
+
 strlen(S) when is_binary(S) ->
     string:length(S).
 

+ 4 - 0
changes/ce/feat-13516.en.md

@@ -0,0 +1,4 @@
+Add a `direct_dispatch` argument for `republish` action.
+
+When `direct_dispatch` is set to `true` (or rendered as `true` from template) the message will be directly dispatched to subscribers.
+This can be used to avoid to triggering other rules or recursively trigger the self-rule.

+ 1 - 1
deploy/docker/Dockerfile

@@ -1,4 +1,4 @@
-ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.3-9:1.15.7-26.2.5-3-debian12
+ARG BUILD_FROM=ghcr.io/emqx/emqx-builder/5.3-11:1.15.7-26.2.5.2-1-debian12
 ARG RUN_FROM=public.ecr.aws/debian/debian:stable-20240612-slim
 ARG SOURCE_TYPE=src # tgz
 

+ 2 - 2
env.sh

@@ -1,6 +1,6 @@
 # https://github.com/emqx/emqx-builder
-export EMQX_BUILDER_VSN=5.3-9
-export OTP_VSN=26.2.5-3
+export EMQX_BUILDER_VSN=5.3-11
+export OTP_VSN=26.2.5.2-1
 export ELIXIR_VSN=1.15.7
 export EMQX_BUILDER=ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VSN}:${ELIXIR_VSN}-${OTP_VSN}-ubuntu22.04
 export EMQX_DOCKER_BUILD_FROM=ghcr.io/emqx/emqx-builder/${EMQX_BUILDER_VSN}:${ELIXIR_VSN}-${OTP_VSN}-debian12

+ 6 - 1
mix.exs

@@ -463,7 +463,12 @@ defmodule EMQXUmbrella.MixProject do
       {:d, :snk_kind, :msg}
     ] ++
       singleton(test_env?(), {:d, :TEST}) ++
-      singleton(not enable_quicer?(), {:d, :BUILD_WITHOUT_QUIC})
+      singleton(not enable_quicer?(), {:d, :BUILD_WITHOUT_QUIC}) ++
+      singleton(store_state_in_ds?(), {:d, :STORE_STATE_IN_DS, true})
+  end
+
+  defp store_state_in_ds?() do
+    "1" == System.get_env("STORE_STATE_IN_DS")
   end
 
   defp singleton(false, _value), do: []

+ 1 - 0
rebar.config.erl

@@ -223,6 +223,7 @@ common_compile_opts(Edition, _RelType, Vsn) ->
         {d, 'EMQX_RELEASE_EDITION', Edition}
     ] ++
         [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1"] ++
+        [{d, 'STORE_STATE_IN_DS'} || os:getenv("STORE_STATE_IN_DS") =:= "1"] ++
         [{d, 'BUILD_WITHOUT_QUIC'} || not is_quicer_supported()].
 
 warn_profile_env() ->

+ 17 - 0
rel/i18n/emqx_rule_engine_schema.hocon

@@ -113,6 +113,23 @@ Placeholders like <code>${.payload.content_type}</code> may be used."""
 republish_args_mqtt_properties.label:
 """MQTT Properties"""
 
+republish_args_direct_dispatch.desc:
+"""Enable direct dispatch to subscribers without initiating a new message publish event.
+When set to `true`, this prevents the recursive processing of a message by the same action
+and is used when the output message does not require further processing.
+
+However, enabling this feature has several limitations:
+
+- The output message from this action is not retained.
+- It does not trigger other rules that operate based on the output topic of this action.
+- It does not activate rules that select from the `$events/message_publish`.
+- It does not trigger plugins that use the `'message.publish'` hook.
+- Topic metrics are not collected for the output message of this action.
+- Message schema validation is not applied (feature of EMQX Enterprise).
+- Message transformation processes are not applied (feature of EMQX Enterprise)."""
+
+republish_args_direct_dispatch.label: "Direct Dispatch"
+
 republish_function.desc:
 """Republish the message as a new MQTT message"""